Drop excessive threading that over-complicates synchronisation.
[profile/ivi/gstreamer-vaapi.git] / gst-libs / gst / vaapi / gstvaapidecoder.c
index a6c848c..7b69475 100644 (file)
@@ -52,45 +52,6 @@ enum {
     PROP_CODEC_DATA
 };
 
-/* Wait _at most_ 10 ms for encoded buffers between each decoding step */
-#define GST_VAAPI_DECODER_TIMEOUT (10000)
-
-static GstBuffer *
-pop_buffer(GstVaapiDecoder *decoder);
-
-static gboolean
-push_surface(GstVaapiDecoder *decoder, GstVaapiSurface *surface);
-
-static DecodedSurface *
-pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time);
-
-static void
-decoder_task(gpointer data)
-{
-    GstVaapiDecoder * const decoder = GST_VAAPI_DECODER_CAST(data);
-    GstVaapiDecoderPrivate * const priv = decoder->priv;
-    GstVaapiDecoderClass * const klass = GST_VAAPI_DECODER_GET_CLASS(decoder);
-    GstBuffer *buffer;
-
-    buffer = pop_buffer(decoder);
-    if (!buffer)
-        return;
-
-    priv->decoder_status = klass->decode(decoder, buffer);
-    GST_DEBUG("decode frame (status = %d)", priv->decoder_status);
-
-    switch (priv->decoder_status) {
-    case GST_VAAPI_DECODER_STATUS_SUCCESS:
-    case GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA:
-        break;
-    default:
-        /*  Send an empty surface to signal an error */
-        push_surface(decoder, NULL);
-        gst_task_pause(priv->decoder_task);
-        break;
-    }
-}
-
 static void
 update_clock(GstVaapiDecoder *decoder, GstBuffer *buffer)
 {
@@ -177,7 +138,7 @@ push_buffer(GstVaapiDecoder *decoder, GstBuffer *buffer)
     GST_DEBUG("queue encoded data buffer %p (%d bytes)",
               buffer, GST_BUFFER_SIZE(buffer));
 
-    g_async_queue_push(priv->buffers, buffer);
+    g_queue_push_tail(priv->buffers, buffer);
     return TRUE;
 }
 
@@ -185,13 +146,9 @@ static GstBuffer *
 pop_buffer(GstVaapiDecoder *decoder)
 {
     GstVaapiDecoderPrivate * const priv = decoder->priv;
-    GTimeVal end_time;
     GstBuffer *buffer;
 
-    g_get_current_time(&end_time);
-    g_time_val_add(&end_time, GST_VAAPI_DECODER_TIMEOUT);
-
-    buffer = g_async_queue_timed_pop(priv->buffers, &end_time);
+    buffer = g_queue_pop_head(priv->buffers);
     if (!buffer)
         return NULL;
 
@@ -202,6 +159,28 @@ pop_buffer(GstVaapiDecoder *decoder)
     return buffer;
 }
 
+static GstVaapiDecoderStatus
+decode_step(GstVaapiDecoder *decoder)
+{
+    GstVaapiDecoderStatus status;
+    GstBuffer *buffer;
+
+    do {
+        buffer = pop_buffer(decoder);
+        if (!buffer)
+            return GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA;
+
+        status = GST_VAAPI_DECODER_GET_CLASS(decoder)->decode(decoder, buffer);
+        GST_DEBUG("decode frame (status = %d)", status);
+        if (status == GST_VAAPI_DECODER_STATUS_SUCCESS)
+            return status;
+
+        if (GST_BUFFER_IS_EOS(buffer))
+            return GST_VAAPI_DECODER_STATUS_END_OF_STREAM;
+    } while (status == GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA);
+    return status;
+}
+
 static inline DecodedSurface *
 create_surface(void)
 {
@@ -224,34 +203,26 @@ push_surface(GstVaapiDecoder *decoder, GstVaapiSurface *surface)
     if (!ds)
         return FALSE;
 
-    if (surface) {
-        GST_DEBUG("queue decoded surface %" GST_VAAPI_ID_FORMAT,
-                  GST_VAAPI_ID_ARGS(GST_VAAPI_OBJECT_ID(surface)));
-        ds->proxy = gst_vaapi_surface_proxy_new(priv->context, surface);
-        if (ds->proxy) {
-            ds->status = GST_VAAPI_DECODER_STATUS_SUCCESS;
-            gst_vaapi_surface_proxy_set_timestamp(
-                ds->proxy, priv->surface_timestamp);
-        }
-        else
-            ds->status = GST_VAAPI_DECODER_STATUS_ERROR_ALLOCATION_FAILED;
+    GST_DEBUG("queue decoded surface %" GST_VAAPI_ID_FORMAT,
+              GST_VAAPI_ID_ARGS(GST_VAAPI_OBJECT_ID(surface)));
+    ds->proxy = gst_vaapi_surface_proxy_new(priv->context, surface);
+    if (ds->proxy) {
+        ds->status = GST_VAAPI_DECODER_STATUS_SUCCESS;
+        gst_vaapi_surface_proxy_set_timestamp(ds->proxy, priv->surface_timestamp);
     }
     else
-        ds->status = priv->decoder_status;
+        ds->status = GST_VAAPI_DECODER_STATUS_ERROR_ALLOCATION_FAILED;
 
-    g_async_queue_push(priv->surfaces, ds);
+    g_queue_push_tail(priv->surfaces, ds);
     return TRUE;
 }
 
 static inline DecodedSurface *
-pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time)
+pop_surface(GstVaapiDecoder *decoder)
 {
     GstVaapiDecoderPrivate * const priv = decoder->priv;
 
-    if (!gst_vaapi_decoder_start(decoder))
-        return NULL;
-
-    return g_async_queue_timed_pop(priv->surfaces, end_time);
+    return g_queue_pop_head(priv->surfaces);
 }
 
 static inline void
@@ -269,12 +240,10 @@ set_codec_data(GstVaapiDecoder *decoder, GstBuffer *codec_data)
 }
 
 static void
-clear_async_queue(GAsyncQueue *q, GDestroyNotify destroy)
+clear_queue(GQueue *q, GDestroyNotify destroy)
 {
-    guint i, qlen = g_async_queue_length(q);
-
-    for (i = 0; i < qlen; i++)
-        destroy(g_async_queue_pop(q));
+    while (!g_queue_is_empty(q))
+        destroy(g_queue_pop_head(q));
 }
 
 static void
@@ -283,8 +252,6 @@ gst_vaapi_decoder_finalize(GObject *object)
     GstVaapiDecoder * const        decoder = GST_VAAPI_DECODER(object);
     GstVaapiDecoderPrivate * const priv    = decoder->priv;
 
-    gst_vaapi_decoder_stop(decoder);
-
     set_codec_data(decoder, NULL);
 
     if (priv->context) {
@@ -293,14 +260,14 @@ gst_vaapi_decoder_finalize(GObject *object)
     }
 
     if (priv->buffers) {
-        clear_async_queue(priv->buffers, (GDestroyNotify)gst_buffer_unref);
-        g_async_queue_unref(priv->buffers);
+        clear_queue(priv->buffers, (GDestroyNotify)gst_buffer_unref);
+        g_queue_free(priv->buffers);
         priv->buffers = NULL;
     }
 
     if (priv->surfaces) {
-        clear_async_queue(priv->surfaces, (GDestroyNotify)destroy_surface);
-        g_async_queue_unref(priv->surfaces);
+        clear_queue(priv->surfaces, (GDestroyNotify)destroy_surface);
+        g_queue_free(priv->surfaces);
         priv->surfaces = NULL;
     }
 
@@ -421,81 +388,8 @@ gst_vaapi_decoder_init(GstVaapiDecoder *decoder)
     priv->fps_d                 = 30;
     priv->surface_timestamp     = GST_CLOCK_TIME_NONE;
     priv->surface_duration      = GST_CLOCK_TIME_NONE;
-    priv->buffers               = g_async_queue_new();
-    priv->surfaces              = g_async_queue_new();
-    priv->decoder_task          = NULL;
-
-    g_static_rec_mutex_init(&priv->decoder_task_lock);
-}
-
-/**
- * gst_vaapi_decoder_start:
- * @decoder: a #GstVaapiDecoder
- *
- * Starts the decoder. This creates the internal decoder thread, if
- * necessary.
- *
- * Return value: %TRUE on success
- */
-gboolean
-gst_vaapi_decoder_start(GstVaapiDecoder *decoder)
-{
-    g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE);
-
-    if (decoder->priv->decoder_task)
-        return TRUE;
-
-    decoder->priv->decoder_task = gst_task_create(decoder_task, decoder);
-    if (!decoder->priv->decoder_task)
-        return FALSE;
-
-    gst_task_set_lock(decoder->priv->decoder_task, &decoder->priv->decoder_task_lock);
-    return gst_task_start(decoder->priv->decoder_task);
-}
-
-/**
- * gst_vaapi_decoder_pause:
- * @decoder: a #GstVaapiDecoder
- *
- * Pauses the decoder. It can be made active again through
- * gst_vaapi_decoder_start() or definitely stopped through
- * gst_vaapi_decoder_stop().
- *
- * Return value: %TRUE on success
- */
-gboolean
-gst_vaapi_decoder_pause(GstVaapiDecoder *decoder)
-{
-    g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE);
-
-    return gst_task_pause(decoder->priv->decoder_task);
-}
-
-/**
- * gst_vaapi_decoder_stop:
- * @decoder: a #GstVaapiDecoder
- *
- * Stops the decoder. This destroys any decoding thread that was
- * previously created by gst_vaapi_decoder_start(). Only
- * gst_vaapi_decoder_get_surface() on the queued surfaces will be
- * allowed at this point.
- *
- * Return value: %FALSE on success
- */
-gboolean
-gst_vaapi_decoder_stop(GstVaapiDecoder *decoder)
-{
-    gboolean success;
-
-    g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE);
-
-    if (!decoder->priv->decoder_task)
-        return FALSE;
-
-    success = gst_task_join(decoder->priv->decoder_task);
-    g_object_unref(decoder->priv->decoder_task);
-    decoder->priv->decoder_task = NULL;
-    return success;
+    priv->buffers               = g_queue_new();
+    priv->surfaces              = g_queue_new();
 }
 
 /**
@@ -625,84 +519,44 @@ gst_vaapi_decoder_put_buffer(GstVaapiDecoder *decoder, GstBuffer *buf)
  * @decoder: a #GstVaapiDecoder
  * @pstatus: return location for the decoder status, or %NULL
  *
- * Waits for a decoded surface to arrive. This functions blocks until
- * the @decoder has a surface ready for the caller. @pstatus is
- * optional but it can help to know what went wrong during the
- * decoding process.
+ * Flushes encoded buffers to the decoder and returns a decoded
+ * surface, if any.
  *
  * Return value: a #GstVaapiSurfaceProxy holding the decoded surface,
  *   or %NULL if none is available (e.g. an error). Caller owns the
  *   returned object. g_object_unref() after usage.
  */
-static GstVaapiSurfaceProxy *
-_gst_vaapi_decoder_get_surface(
+GstVaapiSurfaceProxy *
+gst_vaapi_decoder_get_surface(
     GstVaapiDecoder       *decoder,
-    GTimeVal              *end_time,
     GstVaapiDecoderStatus *pstatus
 )
 {
-    GstVaapiDecoderStatus status;
-    GstVaapiSurfaceProxy *proxy;
+    GstVaapiSurfaceProxy *proxy  = NULL;
+    GstVaapiDecoderStatus status = GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA;
     DecodedSurface *ds;
 
-    ds = pop_surface(decoder, end_time);
+    g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), NULL);
+
+    ds = pop_surface(decoder);
+    if (!ds) {
+        do {
+            status = decode_step(decoder);
+        } while (status == GST_VAAPI_DECODER_STATUS_SUCCESS);
+        ds = pop_surface(decoder);
+    }
+
     if (ds) {
         proxy  = ds->proxy;
         status = ds->status;
         destroy_surface(ds);
     }
-    else {
-        proxy  = NULL;
-        status = GST_VAAPI_DECODER_STATUS_TIMEOUT;
-    }
 
     if (pstatus)
         *pstatus = status;
     return proxy;
 }
 
-GstVaapiSurfaceProxy *
-gst_vaapi_decoder_get_surface(
-    GstVaapiDecoder       *decoder,
-    GstVaapiDecoderStatus *pstatus
-)
-{
-    g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), NULL);
-
-    return _gst_vaapi_decoder_get_surface(decoder, NULL, pstatus);
-}
-
-/**
- * gst_vaapi_decoder_timed_get_surface:
- * @decoder: a #GstVaapiDecoder
- * @timeout: the number of microseconds to wait for the decoded surface
- * @pstatus: return location for the decoder status, or %NULL
- *
- * Waits for a decoded surface to arrive. This function blocks for at
- * least @timeout microseconds. @pstatus is optional but it can help
- * to know what went wrong during the decoding process.
- *
- * Return value: a #GstVaapiSurfaceProxy holding the decoded surface,
- *   or %NULL if none is available (e.g. an error). Caller owns the
- *   returned object. g_object_unref() after usage.
- */
-GstVaapiSurfaceProxy *
-gst_vaapi_decoder_timed_get_surface(
-    GstVaapiDecoder       *decoder,
-    guint32                timeout,
-    GstVaapiDecoderStatus *pstatus
-)
-{
-    GTimeVal end_time;
-
-    g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), NULL);
-
-    g_get_current_time(&end_time);
-    g_time_val_add(&end_time, timeout);
-
-    return _gst_vaapi_decoder_get_surface(decoder, &end_time, pstatus);
-}
-
 gboolean
 gst_vaapi_decoder_ensure_context(
     GstVaapiDecoder    *decoder,