basesrc: add async start option
authorWim Taymans <wim.taymans@collabora.co.uk>
Tue, 6 Dec 2011 13:01:50 +0000 (14:01 +0100)
committerWim Taymans <wim.taymans@collabora.co.uk>
Tue, 6 Dec 2011 13:01:50 +0000 (14:01 +0100)
Add a method to enable async start behaviour. The subclass can then complete the
start operation from any other thread by caling gst_base_src_start_complete().
The base class can wait for the start to complete with
gst_base_src_start_wait().

libs/gst/base/gstbasesrc.c
libs/gst/base/gstbasesrc.h
plugins/elements/gstfakesrc.c

index 3d60e5c..0386a7f 100644 (file)
@@ -213,6 +213,9 @@ struct _GstBaseSrcPrivate
   gboolean discont;
   gboolean flushing;
 
+  GstFlowReturn start_result;
+  gboolean async;
+
   /* if segment should be sent */
   gboolean segment_pending;
 
@@ -317,6 +320,7 @@ static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
 
 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
+
 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
 
@@ -435,7 +439,9 @@ gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
 
-  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
+  basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
 
   GST_DEBUG_OBJECT (basesrc, "init done");
@@ -598,6 +604,49 @@ gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
 }
 
 /**
+ * gst_base_src_set_async:
+ * @src: base source instance
+ * @async: new async mode
+ *
+ * Configure async behaviour in @src, no state change will block. The open,
+ * close, start, stop, play and pause virtual methods will be executed in a
+ * different thread and are thus allowed to perform blocking operations. Any
+ * blocking operation should be unblocked with the unlock vmethod.
+ */
+void
+gst_base_src_set_async (GstBaseSrc * src, gboolean async)
+{
+  g_return_if_fail (GST_IS_BASE_SRC (src));
+
+  GST_OBJECT_LOCK (src);
+  src->priv->async = async;
+  GST_OBJECT_UNLOCK (src);
+}
+
+/**
+ * gst_base_src_is_async:
+ * @src: base source instance
+ *
+ * Get the current async behaviour of @src. See also gst_base_src_set_async().
+ *
+ * Returns: %TRUE if @src is operating in async mode.
+ */
+gboolean
+gst_base_src_is_async (GstBaseSrc * src)
+{
+  gboolean res;
+
+  g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
+
+  GST_OBJECT_LOCK (src);
+  res = src->priv->async;
+  GST_OBJECT_UNLOCK (src);
+
+  return res;
+}
+
+
+/**
  * gst_base_src_query_latency:
  * @src: the source
  * @live: (out) (allow-none): if the source is live
@@ -2179,7 +2228,7 @@ again:
     }
   }
 
-  if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
+  if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)))
     goto not_started;
 
   if (G_UNLIKELY (!bclass->create))
@@ -2364,13 +2413,23 @@ static gboolean
 gst_base_src_is_random_access (GstBaseSrc * src)
 {
   /* we need to start the basesrc to check random access */
-  if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
+  if (!GST_BASE_SRC_IS_STARTED (src)) {
     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
-    if (G_LIKELY (gst_base_src_start (src)))
+    if (G_LIKELY (gst_base_src_start (src))) {
+      if (gst_base_src_start_wait (src) != GST_FLOW_OK)
+        goto start_failed;
       gst_base_src_stop (src);
+    }
   }
 
   return src->random_access;
+
+  /* ERRORS */
+start_failed:
+  {
+    GST_DEBUG_OBJECT (src, "failed to start");
+    return FALSE;
+  }
 }
 
 static void
@@ -2840,24 +2899,23 @@ static gboolean
 gst_base_src_start (GstBaseSrc * basesrc)
 {
   GstBaseSrcClass *bclass;
-  gboolean result, have_size;
-  guint64 size;
-  gboolean seekable;
-  GstFormat format;
-
-  if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
-    return TRUE;
+  gboolean result;
 
-  GST_DEBUG_OBJECT (basesrc, "starting source");
+  GST_LIVE_LOCK (basesrc);
+  if (GST_BASE_SRC_IS_STARTING (basesrc))
+    goto was_starting;
+  if (GST_BASE_SRC_IS_STARTED (basesrc))
+    goto was_started;
 
+  basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
+  GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
   basesrc->num_buffers_left = basesrc->num_buffers;
-
+  basesrc->running = FALSE;
+  basesrc->priv->segment_pending = FALSE;
   GST_OBJECT_LOCK (basesrc);
   gst_segment_init (&basesrc->segment, basesrc->segment.format);
   GST_OBJECT_UNLOCK (basesrc);
-
-  basesrc->running = FALSE;
-  basesrc->priv->segment_pending = FALSE;
+  GST_LIVE_UNLOCK (basesrc);
 
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
   if (bclass->start)
@@ -2868,14 +2926,68 @@ gst_base_src_start (GstBaseSrc * basesrc)
   if (!result)
     goto could_not_start;
 
-  GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
+  if (!gst_base_src_is_async (basesrc))
+    gst_base_src_start_complete (basesrc, GST_FLOW_OK);
+
+  return result;
+
+  /* ERROR */
+was_starting:
+  {
+    GST_DEBUG_OBJECT (basesrc, "was starting");
+    GST_LIVE_UNLOCK (basesrc);
+    return TRUE;
+  }
+was_started:
+  {
+    GST_DEBUG_OBJECT (basesrc, "was started");
+    GST_LIVE_UNLOCK (basesrc);
+    return TRUE;
+  }
+could_not_start:
+  {
+    GST_DEBUG_OBJECT (basesrc, "could not start");
+    /* subclass is supposed to post a message. We don't have to call _stop. */
+    gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
+    return FALSE;
+  }
+}
+
+/**
+ * gst_base_src_start_complete:
+ * @src: base source instance
+ * @ret: a #GstFlowReturn
+ *
+ * Complete an asynchronous start operation. When the subclass overrides the
+ * start method, it should call gst_base_src_start_complete() when the start
+ * operation completes either from the same thread or from an asynchronous
+ * helper thread.
+ */
+void
+gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
+{
+  gboolean have_size;
+  guint64 size;
+  gboolean seekable;
+  GstFormat format;
+  GstPadMode mode;
+  GstEvent *event;
+  GstBaseSrcClass *bclass;
+
+  bclass = GST_BASE_SRC_GET_CLASS (basesrc);
+
+  if (ret != GST_FLOW_OK)
+    goto error;
 
+  GST_DEBUG_OBJECT (basesrc, "starting source");
   format = basesrc->segment.format;
 
   /* figure out the size */
   have_size = FALSE;
   size = -1;
   if (format == GST_FORMAT_BYTES) {
+    bclass = GST_BASE_SRC_GET_CLASS (basesrc);
+
     if (bclass->get_size) {
       if (!(have_size = bclass->get_size (basesrc, &size)))
         size = -1;
@@ -2901,16 +3013,107 @@ gst_base_src_start (GstBaseSrc * basesrc)
 
   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
 
+  /* stop flushing now but for live sources, still block in the LIVE lock when
+   * we are not yet PLAYING */
+  gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
+
+  GST_OBJECT_LOCK (basesrc->srcpad);
+  mode = GST_PAD_MODE (basesrc->srcpad);
+  GST_OBJECT_UNLOCK (basesrc->srcpad);
+
+  if (mode == GST_PAD_MODE_PUSH) {
+    /* do initial seek, which will start the task */
+    GST_OBJECT_LOCK (basesrc);
+    event = basesrc->pending_seek;
+    basesrc->pending_seek = NULL;
+    GST_OBJECT_UNLOCK (basesrc);
+
+    /* no need to unlock anything, the task is certainly
+     * not running here. The perform seek code will start the task when
+     * finished. */
+    if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
+      goto seek_failed;
+
+    if (event)
+      gst_event_unref (event);
+  } else {
+    /* if not random_access, we cannot operate in pull mode for now */
+    if (G_UNLIKELY (!basesrc->random_access))
+      goto no_get_range;
+  }
+
   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
 
-  return TRUE;
+  GST_LIVE_LOCK (basesrc);
+  GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
+  basesrc->priv->start_result = ret;
+  GST_LIVE_SIGNAL (basesrc);
+  GST_LIVE_UNLOCK (basesrc);
 
-  /* ERROR */
-could_not_start:
+  return;
+
+seek_failed:
   {
-    GST_DEBUG_OBJECT (basesrc, "could not start");
-    /* subclass is supposed to post a message. We don't have to call _stop. */
-    return FALSE;
+    GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
+    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
+    if (event)
+      gst_event_unref (event);
+    ret = GST_FLOW_ERROR;
+    goto error;
+  }
+no_get_range:
+  {
+    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
+    GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
+    ret = GST_FLOW_ERROR;
+    goto error;
+  }
+error:
+  {
+    GST_LIVE_LOCK (basesrc);
+    basesrc->priv->start_result = ret;
+    GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
+    GST_LIVE_SIGNAL (basesrc);
+    GST_LIVE_UNLOCK (basesrc);
+    return;
+  }
+}
+
+/**
+ * gst_base_src_start_complete:
+ * @src: base source instance
+ * @ret: a #GstFlowReturn
+ *
+ * Wait until the start operation completes.
+ *
+ * Returns: a #GstFlowReturn.
+ */
+GstFlowReturn
+gst_base_src_start_wait (GstBaseSrc * basesrc)
+{
+  GstFlowReturn result;
+
+  GST_LIVE_LOCK (basesrc);
+  if (G_UNLIKELY (basesrc->priv->flushing))
+    goto flushing;
+
+  while (GST_BASE_SRC_IS_STARTING (basesrc)) {
+    GST_LIVE_WAIT (basesrc);
+    if (G_UNLIKELY (basesrc->priv->flushing))
+      goto flushing;
+  }
+  result = basesrc->priv->start_result;
+  GST_LIVE_UNLOCK (basesrc);
+
+  return result;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_DEBUG_OBJECT (basesrc, "we are flushing");
+    GST_LIVE_UNLOCK (basesrc);
+    return GST_FLOW_WRONG_STATE;
   }
 }
 
@@ -2920,21 +3123,37 @@ gst_base_src_stop (GstBaseSrc * basesrc)
   GstBaseSrcClass *bclass;
   gboolean result = TRUE;
 
-  if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
-    return TRUE;
-
   GST_DEBUG_OBJECT (basesrc, "stopping source");
 
+  /* flush all */
+  gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
+  /* stop the task */
+  gst_pad_stop_task (basesrc->srcpad);
+
+  GST_LIVE_LOCK (basesrc);
+  if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
+    goto was_stopped;
+
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
+  GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
+  basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
+  GST_LIVE_SIGNAL (basesrc);
+  GST_LIVE_UNLOCK (basesrc);
+
   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
   if (bclass->stop)
     result = bclass->stop (basesrc);
 
   gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0);
 
-  if (result)
-    GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
-
   return result;
+
+was_stopped:
+  {
+    GST_DEBUG_OBJECT (basesrc, "was started");
+    GST_LIVE_UNLOCK (basesrc);
+    return TRUE;
+  }
 }
 
 /* start or stop flushing dataprocessing
@@ -3061,7 +3280,6 @@ static gboolean
 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
 {
   GstBaseSrc *basesrc;
-  GstEvent *event;
 
   basesrc = GST_BASE_SRC (parent);
 
@@ -3074,30 +3292,8 @@ gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
 
     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
       goto error_start;
-
-    basesrc->priv->discont = TRUE;
-    gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
-
-    /* do initial seek, which will start the task */
-    GST_OBJECT_LOCK (basesrc);
-    event = basesrc->pending_seek;
-    basesrc->pending_seek = NULL;
-    GST_OBJECT_UNLOCK (basesrc);
-
-    /* no need to unlock anything, the task is certainly
-     * not running here. The perform seek code will start the task when
-     * finished. */
-    if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
-      goto seek_failed;
-
-    if (event)
-      gst_event_unref (event);
   } else {
     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
-    /* flush all */
-    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
-    /* stop the task */
-    gst_pad_stop_task (pad);
     /* now we can stop the source */
     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
       goto error_stop;
@@ -3115,19 +3311,6 @@ error_start:
     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
     return FALSE;
   }
-seek_failed:
-  {
-    GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
-    /* flush all */
-    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
-    /* stop the task */
-    gst_pad_stop_task (pad);
-    /* Stop the basesrc */
-    gst_base_src_stop (basesrc);
-    if (event)
-      gst_event_unref (event);
-    return FALSE;
-  }
 error_stop:
   {
     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
@@ -3147,19 +3330,8 @@ gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
       goto error_start;
-
-    /* if not random_access, we cannot operate in pull mode for now */
-    if (G_UNLIKELY (!gst_base_src_is_random_access (basesrc)))
-      goto no_get_range;
-
-    /* stop flushing now but for live sources, still block in the LIVE lock when
-     * we are not yet PLAYING */
-    gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
   } else {
     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
-    /* flush all, there is no task to stop */
-    gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
-
     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
       goto error_stop;
   }
@@ -3171,12 +3343,6 @@ error_start:
     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
     return FALSE;
   }
-no_get_range:
-  {
-    GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
-    gst_base_src_stop (basesrc);
-    return FALSE;
-  }
 error_stop:
   {
     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
@@ -3248,13 +3414,10 @@ gst_base_src_change_state (GstElement * element, GstStateChange transition)
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
     {
-      GstEvent **event_p;
-
       /* we don't need to unblock anything here, the pad deactivation code
        * already did this */
       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
-      event_p = &basesrc->pending_seek;
-      gst_event_replace (event_p, NULL);
+      gst_event_replace (&basesrc->pending_seek, NULL);
       break;
     }
     case GST_STATE_CHANGE_READY_TO_NULL:
index 0624096..5956866 100644 (file)
@@ -38,17 +38,22 @@ G_BEGIN_DECLS
 
 /**
  * GstBaseSrcFlags:
- * @GST_BASE_SRC_STARTED: has source been started
+ * @GST_BASE_SRC_FLAG_STARTING: has source is starting
+ * @GST_BASE_SRC_FLAG_STARTED: has source been started
  * @GST_BASE_SRC_FLAG_LAST: offset to define more flags
  *
  * The #GstElement flags that a basesrc element may have.
  */
 typedef enum {
-  GST_BASE_SRC_STARTED           = (GST_ELEMENT_FLAG_LAST << 0),
+  GST_BASE_SRC_FLAG_STARTING     = (GST_ELEMENT_FLAG_LAST << 0),
+  GST_BASE_SRC_FLAG_STARTED      = (GST_ELEMENT_FLAG_LAST << 1),
   /* padding */
-  GST_BASE_SRC_FLAG_LAST         = (GST_ELEMENT_FLAG_LAST << 2)
+  GST_BASE_SRC_FLAG_LAST         = (GST_ELEMENT_FLAG_LAST << 16)
 } GstBaseSrcFlags;
 
+#define GST_BASE_SRC_IS_STARTING(obj) GST_OBJECT_FLAG_IS_SET ((obj), GST_BASE_SRC_FLAG_STARTING)
+#define GST_BASE_SRC_IS_STARTED(obj)  GST_OBJECT_FLAG_IS_SET ((obj), GST_BASE_SRC_FLAG_STARTED)
+
 typedef struct _GstBaseSrc GstBaseSrc;
 typedef struct _GstBaseSrcClass GstBaseSrcClass;
 typedef struct _GstBaseSrcPrivate GstBaseSrcPrivate;
@@ -115,7 +120,9 @@ struct _GstBaseSrc {
  * @set_caps: Notify subclass of changed output caps
  * @decide_allocation: configure the allocation query
  * @start: Start processing. Subclasses should open resources and prepare
- *    to produce data.
+ *    to produce data. Implementation should call gst_base_src_start_complete()
+ *    when the operation completes, either from the current thread or any other
+ *    thread that finishes the start operation asynchronously.
  * @stop: Stop processing. Subclasses should use this to close resources.
  * @get_times: Given a buffer, return the start and stop time when it
  *    should be pushed out. The base class will sync on the clock using
@@ -233,6 +240,12 @@ void            gst_base_src_set_format       (GstBaseSrc *src, GstFormat format
 
 void            gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic);
 
+void            gst_base_src_set_async        (GstBaseSrc *src, gboolean async);
+gboolean        gst_base_src_is_async         (GstBaseSrc *src);
+
+void            gst_base_src_start_complete   (GstBaseSrc * basesrc, GstFlowReturn ret);
+GstFlowReturn   gst_base_src_start_wait       (GstBaseSrc * basesrc);
+
 gboolean        gst_base_src_query_latency    (GstBaseSrc *src, gboolean * live,
                                                GstClockTime * min_latency,
                                                GstClockTime * max_latency);
index df46d18..9a367a0 100644 (file)
@@ -526,11 +526,13 @@ gst_fake_src_set_property (GObject * object, guint prop_id,
       src->dump = g_value_get_boolean (value);
       break;
     case PROP_CAN_ACTIVATE_PUSH:
-      g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object, GST_BASE_SRC_STARTED));
+      g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object,
+              GST_BASE_SRC_FLAG_STARTED));
       GST_BASE_SRC (src)->can_activate_push = g_value_get_boolean (value);
       break;
     case PROP_CAN_ACTIVATE_PULL:
-      g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object, GST_BASE_SRC_STARTED));
+      g_return_if_fail (!GST_OBJECT_FLAG_IS_SET (object,
+              GST_BASE_SRC_FLAG_STARTED));
       src->can_activate_pull = g_value_get_boolean (value);
       break;
     case PROP_IS_LIVE: