From: gb Date: Fri, 30 Apr 2010 15:37:28 +0000 (+0000) Subject: Drop excessive threading that over-complicates synchronisation. X-Git-Tag: accepted/trunk/20120822.173359~482 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=a4d201aaf93816fd23ed1eeffae6786cec883020;p=profile%2Fivi%2Fgstreamer-vaapi.git Drop excessive threading that over-complicates synchronisation. MPEG-2 & H.264 videos now play but there are other problems (timestamps). --- diff --git a/gst-libs/gst/vaapi/gstvaapidecoder.c b/gst-libs/gst/vaapi/gstvaapidecoder.c index a6c848c..7b69475 100644 --- a/gst-libs/gst/vaapi/gstvaapidecoder.c +++ b/gst-libs/gst/vaapi/gstvaapidecoder.c @@ -52,45 +52,6 @@ enum { PROP_CODEC_DATA }; -/* Wait _at most_ 10 ms for encoded buffers between each decoding step */ -#define GST_VAAPI_DECODER_TIMEOUT (10000) - -static GstBuffer * -pop_buffer(GstVaapiDecoder *decoder); - -static gboolean -push_surface(GstVaapiDecoder *decoder, GstVaapiSurface *surface); - -static DecodedSurface * -pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time); - -static void -decoder_task(gpointer data) -{ - GstVaapiDecoder * const decoder = GST_VAAPI_DECODER_CAST(data); - GstVaapiDecoderPrivate * const priv = decoder->priv; - GstVaapiDecoderClass * const klass = GST_VAAPI_DECODER_GET_CLASS(decoder); - GstBuffer *buffer; - - buffer = pop_buffer(decoder); - if (!buffer) - return; - - priv->decoder_status = klass->decode(decoder, buffer); - GST_DEBUG("decode frame (status = %d)", priv->decoder_status); - - switch (priv->decoder_status) { - case GST_VAAPI_DECODER_STATUS_SUCCESS: - case GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA: - break; - default: - /* Send an empty surface to signal an error */ - push_surface(decoder, NULL); - gst_task_pause(priv->decoder_task); - break; - } -} - static void update_clock(GstVaapiDecoder *decoder, GstBuffer *buffer) { @@ -177,7 +138,7 @@ push_buffer(GstVaapiDecoder *decoder, GstBuffer *buffer) GST_DEBUG("queue encoded data buffer %p (%d bytes)", buffer, GST_BUFFER_SIZE(buffer)); - g_async_queue_push(priv->buffers, buffer); + g_queue_push_tail(priv->buffers, buffer); return TRUE; } @@ -185,13 +146,9 @@ static GstBuffer * pop_buffer(GstVaapiDecoder *decoder) { GstVaapiDecoderPrivate * const priv = decoder->priv; - GTimeVal end_time; GstBuffer *buffer; - g_get_current_time(&end_time); - g_time_val_add(&end_time, GST_VAAPI_DECODER_TIMEOUT); - - buffer = g_async_queue_timed_pop(priv->buffers, &end_time); + buffer = g_queue_pop_head(priv->buffers); if (!buffer) return NULL; @@ -202,6 +159,28 @@ pop_buffer(GstVaapiDecoder *decoder) return buffer; } +static GstVaapiDecoderStatus +decode_step(GstVaapiDecoder *decoder) +{ + GstVaapiDecoderStatus status; + GstBuffer *buffer; + + do { + buffer = pop_buffer(decoder); + if (!buffer) + return GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA; + + status = GST_VAAPI_DECODER_GET_CLASS(decoder)->decode(decoder, buffer); + GST_DEBUG("decode frame (status = %d)", status); + if (status == GST_VAAPI_DECODER_STATUS_SUCCESS) + return status; + + if (GST_BUFFER_IS_EOS(buffer)) + return GST_VAAPI_DECODER_STATUS_END_OF_STREAM; + } while (status == GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA); + return status; +} + static inline DecodedSurface * create_surface(void) { @@ -224,34 +203,26 @@ push_surface(GstVaapiDecoder *decoder, GstVaapiSurface *surface) if (!ds) return FALSE; - if (surface) { - GST_DEBUG("queue decoded surface %" GST_VAAPI_ID_FORMAT, - GST_VAAPI_ID_ARGS(GST_VAAPI_OBJECT_ID(surface))); - ds->proxy = gst_vaapi_surface_proxy_new(priv->context, surface); - if (ds->proxy) { - ds->status = GST_VAAPI_DECODER_STATUS_SUCCESS; - gst_vaapi_surface_proxy_set_timestamp( - ds->proxy, priv->surface_timestamp); - } - else - ds->status = GST_VAAPI_DECODER_STATUS_ERROR_ALLOCATION_FAILED; + GST_DEBUG("queue decoded surface %" GST_VAAPI_ID_FORMAT, + GST_VAAPI_ID_ARGS(GST_VAAPI_OBJECT_ID(surface))); + ds->proxy = gst_vaapi_surface_proxy_new(priv->context, surface); + if (ds->proxy) { + ds->status = GST_VAAPI_DECODER_STATUS_SUCCESS; + gst_vaapi_surface_proxy_set_timestamp(ds->proxy, priv->surface_timestamp); } else - ds->status = priv->decoder_status; + ds->status = GST_VAAPI_DECODER_STATUS_ERROR_ALLOCATION_FAILED; - g_async_queue_push(priv->surfaces, ds); + g_queue_push_tail(priv->surfaces, ds); return TRUE; } static inline DecodedSurface * -pop_surface(GstVaapiDecoder *decoder, GTimeVal *end_time) +pop_surface(GstVaapiDecoder *decoder) { GstVaapiDecoderPrivate * const priv = decoder->priv; - if (!gst_vaapi_decoder_start(decoder)) - return NULL; - - return g_async_queue_timed_pop(priv->surfaces, end_time); + return g_queue_pop_head(priv->surfaces); } static inline void @@ -269,12 +240,10 @@ set_codec_data(GstVaapiDecoder *decoder, GstBuffer *codec_data) } static void -clear_async_queue(GAsyncQueue *q, GDestroyNotify destroy) +clear_queue(GQueue *q, GDestroyNotify destroy) { - guint i, qlen = g_async_queue_length(q); - - for (i = 0; i < qlen; i++) - destroy(g_async_queue_pop(q)); + while (!g_queue_is_empty(q)) + destroy(g_queue_pop_head(q)); } static void @@ -283,8 +252,6 @@ gst_vaapi_decoder_finalize(GObject *object) GstVaapiDecoder * const decoder = GST_VAAPI_DECODER(object); GstVaapiDecoderPrivate * const priv = decoder->priv; - gst_vaapi_decoder_stop(decoder); - set_codec_data(decoder, NULL); if (priv->context) { @@ -293,14 +260,14 @@ gst_vaapi_decoder_finalize(GObject *object) } if (priv->buffers) { - clear_async_queue(priv->buffers, (GDestroyNotify)gst_buffer_unref); - g_async_queue_unref(priv->buffers); + clear_queue(priv->buffers, (GDestroyNotify)gst_buffer_unref); + g_queue_free(priv->buffers); priv->buffers = NULL; } if (priv->surfaces) { - clear_async_queue(priv->surfaces, (GDestroyNotify)destroy_surface); - g_async_queue_unref(priv->surfaces); + clear_queue(priv->surfaces, (GDestroyNotify)destroy_surface); + g_queue_free(priv->surfaces); priv->surfaces = NULL; } @@ -421,81 +388,8 @@ gst_vaapi_decoder_init(GstVaapiDecoder *decoder) priv->fps_d = 30; priv->surface_timestamp = GST_CLOCK_TIME_NONE; priv->surface_duration = GST_CLOCK_TIME_NONE; - priv->buffers = g_async_queue_new(); - priv->surfaces = g_async_queue_new(); - priv->decoder_task = NULL; - - g_static_rec_mutex_init(&priv->decoder_task_lock); -} - -/** - * gst_vaapi_decoder_start: - * @decoder: a #GstVaapiDecoder - * - * Starts the decoder. This creates the internal decoder thread, if - * necessary. - * - * Return value: %TRUE on success - */ -gboolean -gst_vaapi_decoder_start(GstVaapiDecoder *decoder) -{ - g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE); - - if (decoder->priv->decoder_task) - return TRUE; - - decoder->priv->decoder_task = gst_task_create(decoder_task, decoder); - if (!decoder->priv->decoder_task) - return FALSE; - - gst_task_set_lock(decoder->priv->decoder_task, &decoder->priv->decoder_task_lock); - return gst_task_start(decoder->priv->decoder_task); -} - -/** - * gst_vaapi_decoder_pause: - * @decoder: a #GstVaapiDecoder - * - * Pauses the decoder. It can be made active again through - * gst_vaapi_decoder_start() or definitely stopped through - * gst_vaapi_decoder_stop(). - * - * Return value: %TRUE on success - */ -gboolean -gst_vaapi_decoder_pause(GstVaapiDecoder *decoder) -{ - g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE); - - return gst_task_pause(decoder->priv->decoder_task); -} - -/** - * gst_vaapi_decoder_stop: - * @decoder: a #GstVaapiDecoder - * - * Stops the decoder. This destroys any decoding thread that was - * previously created by gst_vaapi_decoder_start(). Only - * gst_vaapi_decoder_get_surface() on the queued surfaces will be - * allowed at this point. - * - * Return value: %FALSE on success - */ -gboolean -gst_vaapi_decoder_stop(GstVaapiDecoder *decoder) -{ - gboolean success; - - g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), FALSE); - - if (!decoder->priv->decoder_task) - return FALSE; - - success = gst_task_join(decoder->priv->decoder_task); - g_object_unref(decoder->priv->decoder_task); - decoder->priv->decoder_task = NULL; - return success; + priv->buffers = g_queue_new(); + priv->surfaces = g_queue_new(); } /** @@ -625,84 +519,44 @@ gst_vaapi_decoder_put_buffer(GstVaapiDecoder *decoder, GstBuffer *buf) * @decoder: a #GstVaapiDecoder * @pstatus: return location for the decoder status, or %NULL * - * Waits for a decoded surface to arrive. This functions blocks until - * the @decoder has a surface ready for the caller. @pstatus is - * optional but it can help to know what went wrong during the - * decoding process. + * Flushes encoded buffers to the decoder and returns a decoded + * surface, if any. * * Return value: a #GstVaapiSurfaceProxy holding the decoded surface, * or %NULL if none is available (e.g. an error). Caller owns the * returned object. g_object_unref() after usage. */ -static GstVaapiSurfaceProxy * -_gst_vaapi_decoder_get_surface( +GstVaapiSurfaceProxy * +gst_vaapi_decoder_get_surface( GstVaapiDecoder *decoder, - GTimeVal *end_time, GstVaapiDecoderStatus *pstatus ) { - GstVaapiDecoderStatus status; - GstVaapiSurfaceProxy *proxy; + GstVaapiSurfaceProxy *proxy = NULL; + GstVaapiDecoderStatus status = GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA; DecodedSurface *ds; - ds = pop_surface(decoder, end_time); + g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), NULL); + + ds = pop_surface(decoder); + if (!ds) { + do { + status = decode_step(decoder); + } while (status == GST_VAAPI_DECODER_STATUS_SUCCESS); + ds = pop_surface(decoder); + } + if (ds) { proxy = ds->proxy; status = ds->status; destroy_surface(ds); } - else { - proxy = NULL; - status = GST_VAAPI_DECODER_STATUS_TIMEOUT; - } if (pstatus) *pstatus = status; return proxy; } -GstVaapiSurfaceProxy * -gst_vaapi_decoder_get_surface( - GstVaapiDecoder *decoder, - GstVaapiDecoderStatus *pstatus -) -{ - g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), NULL); - - return _gst_vaapi_decoder_get_surface(decoder, NULL, pstatus); -} - -/** - * gst_vaapi_decoder_timed_get_surface: - * @decoder: a #GstVaapiDecoder - * @timeout: the number of microseconds to wait for the decoded surface - * @pstatus: return location for the decoder status, or %NULL - * - * Waits for a decoded surface to arrive. This function blocks for at - * least @timeout microseconds. @pstatus is optional but it can help - * to know what went wrong during the decoding process. - * - * Return value: a #GstVaapiSurfaceProxy holding the decoded surface, - * or %NULL if none is available (e.g. an error). Caller owns the - * returned object. g_object_unref() after usage. - */ -GstVaapiSurfaceProxy * -gst_vaapi_decoder_timed_get_surface( - GstVaapiDecoder *decoder, - guint32 timeout, - GstVaapiDecoderStatus *pstatus -) -{ - GTimeVal end_time; - - g_return_val_if_fail(GST_VAAPI_IS_DECODER(decoder), NULL); - - g_get_current_time(&end_time); - g_time_val_add(&end_time, timeout); - - return _gst_vaapi_decoder_get_surface(decoder, &end_time, pstatus); -} - gboolean gst_vaapi_decoder_ensure_context( GstVaapiDecoder *decoder, diff --git a/gst-libs/gst/vaapi/gstvaapidecoder.h b/gst-libs/gst/vaapi/gstvaapidecoder.h index b3a212e..6637319 100644 --- a/gst-libs/gst/vaapi/gstvaapidecoder.h +++ b/gst-libs/gst/vaapi/gstvaapidecoder.h @@ -156,13 +156,6 @@ gst_vaapi_decoder_get_surface( GstVaapiDecoderStatus *pstatus ); -GstVaapiSurfaceProxy * -gst_vaapi_decoder_timed_get_surface( - GstVaapiDecoder *decoder, - guint32 timeout, - GstVaapiDecoderStatus *pstatus -); - G_END_DECLS #endif /* GST_VAAPI_DECODER_H */ diff --git a/gst-libs/gst/vaapi/gstvaapidecoder_priv.h b/gst-libs/gst/vaapi/gstvaapidecoder_priv.h index a271995..78d2579 100644 --- a/gst-libs/gst/vaapi/gstvaapidecoder_priv.h +++ b/gst-libs/gst/vaapi/gstvaapidecoder_priv.h @@ -87,19 +87,16 @@ G_BEGIN_DECLS GstVaapiDecoderPrivate)) struct _GstVaapiDecoderPrivate { - GstVaapiDisplay *display; - GstVaapiContext *context; - GstVaapiCodec codec; - GstBuffer *codec_data; - guint fps_n; - guint fps_d; - GAsyncQueue *buffers; - GAsyncQueue *surfaces; - GstClockTime surface_timestamp; - GstClockTime surface_duration; - GstTask *decoder_task; - GStaticRecMutex decoder_task_lock; - GstVaapiDecoderStatus decoder_status; + GstVaapiDisplay *display; + GstVaapiContext *context; + GstVaapiCodec codec; + GstBuffer *codec_data; + guint fps_n; + guint fps_d; + GQueue *buffers; + GQueue *surfaces; + GstClockTime surface_timestamp; + GstClockTime surface_duration; }; gboolean diff --git a/gst/vaapidecode/gstvaapidecode.c b/gst/vaapidecode/gstvaapidecode.c index 762e0ce..b9564f2 100644 --- a/gst/vaapidecode/gstvaapidecode.c +++ b/gst/vaapidecode/gstvaapidecode.c @@ -91,18 +91,21 @@ enum { PROP_USE_FFMPEG, }; -static void -gst_vaapidecode_task_cb(gpointer data) +static GstFlowReturn +gst_vaapidecode_step(GstVaapiDecode *decode) { - GstVaapiDecode * const decode = GST_VAAPIDECODE(data); - GstVaapiDecoderStatus status; GstVaapiSurfaceProxy *proxy; + GstVaapiDecoderStatus status; GstBuffer *buffer; GstFlowReturn ret; - proxy = gst_vaapi_decoder_timed_get_surface(decode->decoder, 10000, &status); - if (!proxy || status != GST_VAAPI_DECODER_STATUS_SUCCESS) - return; + proxy = gst_vaapi_decoder_get_surface(decode->decoder, &status); + if (!proxy) { + if (status != GST_VAAPI_DECODER_STATUS_ERROR_NO_DATA) + goto error_decode; + /* More data is needed */ + return GST_FLOW_OK; + } buffer = NULL; ret = gst_pad_alloc_buffer( @@ -125,9 +128,14 @@ gst_vaapidecode_task_cb(gpointer data) goto error_commit_buffer; g_object_unref(proxy); - return; + return GST_FLOW_OK; /* ERRORS */ +error_decode: + { + GST_DEBUG("decode error %d", status); + return GST_FLOW_UNEXPECTED; + } error_create_buffer: { const GstVaapiID surface_id = @@ -137,53 +145,17 @@ error_create_buffer: "surface %" GST_VAAPI_ID_FORMAT " (error %d)", GST_VAAPI_ID_ARGS(surface_id), ret); g_object_unref(proxy); - return; + return GST_FLOW_UNEXPECTED; } error_commit_buffer: { GST_DEBUG("video sink rejected the video buffer (error %d)", ret); g_object_unref(proxy); - return; + return GST_FLOW_UNEXPECTED; } } static gboolean -gst_vaapidecode_start(GstVaapiDecode *decode) -{ - if (gst_task_get_state(decode->decoder_task) == GST_TASK_STARTED) - return TRUE; - - GST_DEBUG("start decoding threads"); - if (!gst_vaapi_decoder_start(decode->decoder)) - return FALSE; - return gst_task_start(decode->decoder_task); -} - -static gboolean -gst_vaapidecode_pause(GstVaapiDecode *decode) -{ - if (gst_task_get_state(decode->decoder_task) == GST_TASK_PAUSED) - return TRUE; - - GST_DEBUG("pause decoding threads"); - if (!gst_vaapi_decoder_pause(decode->decoder)) - return FALSE; - return gst_task_pause(decode->decoder_task); -} - -static gboolean -gst_vaapidecode_stop(GstVaapiDecode *decode) -{ - if (gst_task_get_state(decode->decoder_task) == GST_TASK_STOPPED) - return TRUE; - - GST_DEBUG("stop decoding threads"); - if (!gst_vaapi_decoder_stop(decode->decoder)) - return FALSE; - return gst_task_stop(decode->decoder_task); -} - -static gboolean gst_vaapidecode_create(GstVaapiDecode *decode) { GstVaapiVideoSink *sink; @@ -208,26 +180,12 @@ gst_vaapidecode_create(GstVaapiDecode *decode) if (decode->use_ffmpeg) decode->decoder = gst_vaapi_decoder_ffmpeg_new(display, codec, decode->codec_data); - if (!decode->decoder) - return FALSE; - - decode->decoder_task = gst_task_create(gst_vaapidecode_task_cb, decode); - if (!decode->decoder_task) - return FALSE; - - gst_task_set_lock(decode->decoder_task, &decode->decoder_task_lock); - return TRUE; + return decode->decoder != NULL; } static void gst_vaapidecode_destroy(GstVaapiDecode *decode) { - if (decode->decoder_task) { - gst_task_join(decode->decoder_task); - g_object_unref(decode->decoder_task); - decode->decoder_task = NULL; - } - if (decode->decoder) { gst_vaapi_decoder_put_buffer(decode->decoder, NULL); g_object_unref(decode->decoder); @@ -322,8 +280,6 @@ gst_vaapidecode_change_state(GstElement *element, GstStateChange transition) case GST_STATE_CHANGE_READY_TO_PAUSED: break; case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - if (!gst_vaapidecode_start(decode)) - return GST_STATE_CHANGE_FAILURE; break; default: break; @@ -335,12 +291,8 @@ gst_vaapidecode_change_state(GstElement *element, GstStateChange transition) switch (transition) { case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - if (!gst_vaapidecode_pause(decode)) - return GST_STATE_CHANGE_FAILURE; break; case GST_STATE_CHANGE_PAUSED_TO_READY: - if (!gst_vaapidecode_stop(decode)) - return GST_STATE_CHANGE_FAILURE; break; default: break; @@ -424,14 +376,11 @@ gst_vaapidecode_chain(GstPad *pad, GstBuffer *buf) goto error_create_decoder; } - if (!gst_vaapidecode_start(decode)) - goto error_start_decoder; - if (!gst_vaapi_decoder_put_buffer(decode->decoder, buf)) goto error_push_buffer; gst_buffer_unref(buf); - return GST_FLOW_OK; + return gst_vaapidecode_step(decode); /* ERRORS */ error_create_decoder: @@ -440,12 +389,6 @@ error_create_decoder: gst_buffer_unref(buf); return GST_FLOW_UNEXPECTED; } -error_start_decoder: - { - GST_DEBUG("failed to start decoder"); - gst_buffer_unref(buf); - return GST_FLOW_UNEXPECTED; - } error_push_buffer: { GST_DEBUG("failed to push input buffer to decoder"); @@ -481,14 +424,11 @@ gst_vaapidecode_init(GstVaapiDecode *decode, GstVaapiDecodeClass *klass) { GstElementClass * const element_class = GST_ELEMENT_CLASS(klass); - decode->display = NULL; - decode->profile = 0; - decode->codec_data = NULL; - decode->decoder = NULL; - decode->decoder_task = NULL; - decode->use_ffmpeg = TRUE; - - g_static_rec_mutex_init(&decode->decoder_task_lock); + decode->display = NULL; + decode->profile = 0; + decode->codec_data = NULL; + decode->decoder = NULL; + decode->use_ffmpeg = TRUE; /* Pad through which data comes in to the element */ decode->sinkpad = gst_pad_new_from_template( diff --git a/gst/vaapidecode/gstvaapidecode.h b/gst/vaapidecode/gstvaapidecode.h index 0772c39..b2b6915 100644 --- a/gst/vaapidecode/gstvaapidecode.h +++ b/gst/vaapidecode/gstvaapidecode.h @@ -65,8 +65,6 @@ struct _GstVaapiDecode { GstVaapiProfile profile; GstBuffer *codec_data; GstVaapiDecoder *decoder; - GstTask *decoder_task; - GStaticRecMutex decoder_task_lock; unsigned int use_ffmpeg : 1; }; diff --git a/tests/test-decode.c b/tests/test-decode.c index 36d07af..f4b56b9 100644 --- a/tests/test-decode.c +++ b/tests/test-decode.c @@ -28,10 +28,6 @@ #include "test-h264.h" #include "test-vc1.h" -/* Default timeout to wait for the first decoded frame (10 ms) */ -/* Defined to -1 if the application indefintely waits for the decoded frame */ -#define TIMEOUT -1 - typedef void (*GetVideoDataFunc)(const guchar **data, guint *size); typedef struct _CodecDefs CodecDefs; @@ -123,16 +119,9 @@ main(int argc, char *argv[]) if (!gst_vaapi_decoder_put_buffer(decoder, NULL)) g_error("could not send EOS to the decoder"); - if (TIMEOUT < 0) { - proxy = gst_vaapi_decoder_get_surface(decoder, &status); - if (!proxy) - g_error("could not get decoded surface"); - } - else { - proxy = gst_vaapi_decoder_timed_get_surface(decoder, TIMEOUT, &status); - if (!proxy) - g_warning("Could not get decoded surface after %d us", TIMEOUT); - } + proxy = gst_vaapi_decoder_get_surface(decoder, &status); + if (!proxy) + g_error("could not get decoded surface (decoder status %d)", status); gst_vaapi_window_show(window);