From db08890edd0db93731600f8d2aafb6418e5c9267 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Fri, 8 Jul 2011 15:25:07 +0200 Subject: [PATCH] omx: Rework port reconfiguration We always reconfigure all ports now if the settings of one port changes to prevent lots of race conditions, dropped frames and similar issues. --- omx/gstomx.c | 262 +++++++++++++++++++++++++++++++-------------------- omx/gstomx.h | 45 +++++++-- omx/gstomxvideodec.c | 45 +++++++-- 3 files changed, 235 insertions(+), 117 deletions(-) diff --git a/omx/gstomx.c b/omx/gstomx.c index 1e3be61..5169a22 100644 --- a/omx/gstomx.c +++ b/omx/gstomx.c @@ -233,48 +233,24 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent, } case OMX_EventPortSettingsChanged: { - OMX_U32 index; - GstOMXPort *port = NULL; - - /* FIXME XXX: WTF? Bellagio passes - * the port index as *second* parameter - * instead of first... - */ - index = nData2; - - port = gst_omx_component_get_port (comp, index); - if (!port) - break; - - GST_DEBUG_OBJECT (comp->parent, "Settings of port %u changed", index); - - g_mutex_lock (port->port_lock); - port->settings_changed = TRUE; - g_cond_broadcast (port->port_cond); - g_mutex_unlock (port->port_lock); - - /* FIXME XXX: Bellagio only sends the event for the - * input port even if the output port settings change - * too... - */ - { - gint i, n; - - n = comp->ports->len; - for (i = 0; i < n; i++) { - port = g_ptr_array_index (comp->ports, i); - - /* Don't notify the same port twice */ - if (port->index == index) - continue; - - g_mutex_lock (port->port_lock); - port->settings_changed = TRUE; - g_cond_broadcast (port->port_cond); - g_mutex_unlock (port->port_lock); - } + gint i, n; + + g_mutex_lock (comp->state_lock); + GST_DEBUG_OBJECT (comp->parent, "Settings changed (cookie: %d -> %d)", + comp->settings_cookie, comp->settings_cookie + 1); + comp->settings_cookie++; + comp->reconfigure_out_pending = comp->n_out_ports; + g_mutex_unlock (comp->state_lock); + + /* Now wake up all ports */ + n = comp->ports->len; + for (i = 0; i < n; i++) { + GstOMXPort *port = g_ptr_array_index (comp->ports, i); + + g_mutex_lock (port->port_lock); + g_cond_broadcast (port->port_cond); + g_mutex_unlock (port->port_lock); } - break; } case OMX_EventPortFormatDetected: @@ -366,11 +342,15 @@ gst_omx_component_new (GstObject * parent, const gchar * core_name, comp->parent = gst_object_ref (parent); comp->ports = g_ptr_array_new (); + comp->n_in_ports = 0; + comp->n_out_ports = 0; comp->state_lock = g_mutex_new (); comp->state_cond = g_cond_new (); comp->pending_state = OMX_StateInvalid; comp->last_error = OMX_ErrorNone; + comp->settings_cookie = 0; + comp->reconfigure_out_pending = 0; OMX_GetState (comp->handle, &comp->state); @@ -435,6 +415,10 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state) comp->pending_state = state; err = OMX_SendCommand (comp->handle, OMX_CommandStateSet, state, NULL); + /* Reset some things */ + if (old_state == OMX_StateExecuting && state == OMX_StateIdle) + comp->reconfigure_out_pending = 0; + done: g_mutex_unlock (comp->state_lock); @@ -560,6 +544,13 @@ gst_omx_component_add_port (GstOMXComponent * comp, guint32 index) port->flushing = TRUE; port->flushed = FALSE; port->settings_changed = FALSE; + port->enabled_changed = FALSE; + port->settings_cookie = comp->settings_cookie; + + if (port->port_def.eDir == OMX_DirInput) + comp->n_in_ports++; + else + comp->n_out_ports++; g_ptr_array_add (comp->ports, port); @@ -587,6 +578,29 @@ gst_omx_component_get_port (GstOMXComponent * comp, guint32 index) return NULL; } +gint +gst_omx_component_get_settings_cookie (GstOMXComponent * comp) +{ + gint cookie; + + g_return_val_if_fail (comp != NULL, -1); + + g_mutex_lock (comp->state_lock); + cookie = comp->settings_cookie; + g_mutex_unlock (comp->state_lock); + + return cookie; +} + +void +gst_omx_component_trigger_settings_changed (GstOMXComponent * comp) +{ + g_return_if_fail (comp != NULL); + + EventHandler (comp->handle, comp, OMX_EventPortSettingsChanged, OMX_ALL, 0, + NULL); +} + /* NOTE: This takes comp->state_lock *and* *all* port->port_lock! */ void gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err) @@ -703,6 +717,8 @@ gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf) g_mutex_lock (port->port_lock); +retry: + /* Check if the component is in an error state */ if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { GST_ERROR_OBJECT (comp->parent, "Component is in error state: %d", err); @@ -716,58 +732,98 @@ gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf) goto done; } - /* Check if the port needs to be reconfigured */ - if (port->settings_changed) { - GST_DEBUG_OBJECT (comp->parent, "Settings changed for port %u", - port->index); - ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURE; - goto done; - } - - /* Wait until there's something in the queue - * or something else happened that requires - * to return a NULL buffer, e.g. an error + /* If this is an input port and it needs to be reconfigured we + * first wait here until all output ports are reconfigured and + * then return */ - if (g_queue_is_empty (port->pending_buffers)) - g_cond_wait (port->port_cond, port->port_lock); + if (port->port_def.eDir == OMX_DirInput && + port->settings_cookie != gst_omx_component_get_settings_cookie (comp)) { + g_mutex_unlock (port->port_lock); + g_mutex_lock (comp->state_lock); + while (comp->reconfigure_out_pending > 0 && + (err = comp->last_error) == OMX_ErrorNone && !port->flushing) { + GST_DEBUG_OBJECT (comp->parent, + "Waiting for %d output ports to reconfigure", + comp->reconfigure_out_pending); + g_cond_wait (comp->state_cond, comp->state_lock); + } + g_mutex_unlock (comp->state_lock); + g_mutex_lock (port->port_lock); + if (err != OMX_ErrorNone) { + GST_ERROR_OBJECT (comp->parent, "Got error while waiting: %d", err); + ret = GST_OMX_ACQUIRE_BUFFER_ERROR; + goto done; + } else if (port->flushing) { + GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing now", port->index); + ret = GST_OMX_ACQUIRE_BUFFER_FLUSHING; + goto done; + } - /* Check if the component is in an error state */ - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { - GST_ERROR_OBJECT (comp->parent, "Component is in error state: %d", err); - ret = GST_OMX_ACQUIRE_BUFFER_ERROR; + ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURE; + port->settings_changed = TRUE; goto done; } - /* Check if the port is flushing */ - if (port->flushing) { - ret = GST_OMX_ACQUIRE_BUFFER_FLUSHING; + /* If we have an output port that needs to be reconfigured + * and it still has buffers pending for the old configuration + * we first return them. + * NOTE: If buffers for this configuration arrive later + * we have to drop them... */ + if (port->port_def.eDir == OMX_DirOutput && + port->settings_cookie != gst_omx_component_get_settings_cookie (comp)) { + if (!g_queue_is_empty (port->pending_buffers)) { + GST_DEBUG_OBJECT (comp->parent, + "Output port %u needs reconfiguration but has buffers pending", + port->index); + _buf = g_queue_pop_head (port->pending_buffers); + g_assert (_buf != NULL); + ret = GST_OMX_ACQUIRE_BUFFER_OK; + goto done; + } + + ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURE; + port->settings_changed = TRUE; goto done; } - /* Check if the port needs to be reconfigured */ - /* FIXME: There might still be pending buffers for the - * previous configuration */ if (port->settings_changed) { - GST_DEBUG_OBJECT (comp->parent, "Settings changed for port %u", - port->index); - ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURE; + GST_DEBUG_OBJECT (comp->parent, + "Port %u has settings changed, need new caps", port->index); + ret = GST_OMX_ACQUIRE_BUFFER_RECONFIGURED; + port->settings_changed = FALSE; goto done; } - _buf = g_queue_pop_head (port->pending_buffers); + /* + * At this point we have no error or flushing port + * and a properly configured port. + * + */ - if (_buf) { - ret = GST_OMX_ACQUIRE_BUFFER_OK; - *buf = _buf; + /* If the queue is empty we wait until a buffer + * arrives, an error happens, the port is flushing + * or the port needs to be reconfigured. + */ + if (g_queue_is_empty (port->pending_buffers)) { + GST_DEBUG_OBJECT (comp->parent, "Queue of port %u is empty", port->index); + g_cond_wait (port->port_cond, port->port_lock); } else { - GST_ERROR_OBJECT (comp->parent, "Unexpectactly got no buffer"); - ret = GST_OMX_ACQUIRE_BUFFER_ERROR; + GST_DEBUG_OBJECT (comp->parent, "Port %u has pending buffers"); + _buf = g_queue_pop_head (port->pending_buffers); + ret = GST_OMX_ACQUIRE_BUFFER_OK; + goto done; } + /* And now check everything again and maybe get a buffer */ + goto retry; + done: g_mutex_unlock (port->port_lock); - GST_DEBUG_OBJECT (comp->parent, "Acquired buffer %p from port %u: %d", buf, + if (_buf) + *buf = _buf; + + GST_DEBUG_OBJECT (comp->parent, "Acquired buffer %p from port %u: %d", _buf, port->index, ret); return ret; @@ -790,16 +846,18 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf) g_mutex_lock (port->port_lock); + if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { + GST_ERROR_OBJECT (comp->parent, "Component is in error state: %d", err); + goto done; + } + if (port->flushing) { GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing, not releasing buffer", port->index); goto done; } - if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) { - GST_ERROR_OBJECT (comp->parent, "Component is in error state: %d", err); - goto done; - } + /* FIXME: What if the settings cookies don't match? */ buf->used = TRUE; if (port->port_def.eDir == OMX_DirInput) { @@ -853,9 +911,21 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush) g_mutex_unlock (comp->state_lock); port->flushing = flush; - if (flush) + if (flush) { g_cond_broadcast (port->port_cond); + /* We also need to signal the state cond because + * an input port might wait on this for the output + * ports to reconfigure. This will not confuse + * other waiters on the state cond because they will + * additionally check if the condition they're waiting + * for is true after waking up. + */ + g_mutex_lock (comp->state_lock); + g_cond_broadcast (comp->state_cond); + g_mutex_unlock (comp->state_lock); + } + if (flush) { GTimeVal abstimeout, *timeval; gboolean signalled; @@ -1012,6 +1082,7 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port) buf = g_slice_new0 (GstOMXBuffer); buf->port = port; buf->used = FALSE; + buf->settings_cookie = port->settings_cookie; g_ptr_array_add (port->buffers, buf); err = @@ -1312,26 +1383,6 @@ gst_omx_port_is_enabled (GstOMXPort * port) return enabled; } -gboolean -gst_omx_port_is_settings_changed (GstOMXPort * port) -{ - GstOMXComponent *comp; - gboolean settings_changed; - - g_return_val_if_fail (port != NULL, FALSE); - - comp = port->comp; - - g_mutex_lock (port->port_lock); - settings_changed = port->settings_changed; - g_mutex_unlock (port->port_lock); - - GST_DEBUG_OBJECT (comp->parent, "Port %u has settings-changed: %d", - port->index, settings_changed); - - return settings_changed; -} - OMX_ERRORTYPE gst_omx_port_reconfigure (GstOMXPort * port) { @@ -1360,7 +1411,18 @@ gst_omx_port_reconfigure (GstOMXPort * port) if (err != OMX_ErrorNone) goto done; - port->settings_changed = FALSE; + port->settings_cookie = comp->settings_cookie; + + /* If this is an output port, notify all input ports + * that might wait for us to reconfigure in + * acquire_buffer() + */ + if (port->port_def.eDir == OMX_DirOutput) { + g_mutex_lock (comp->state_lock); + comp->reconfigure_out_pending--; + g_cond_broadcast (comp->state_cond); + g_mutex_unlock (comp->state_lock); + } done: GST_DEBUG_OBJECT (comp->parent, "Reconfigured port %u: %d", port->index, err); diff --git a/omx/gstomx.h b/omx/gstomx.h index 3fbc12a..e7eda3a 100644 --- a/omx/gstomx.h +++ b/omx/gstomx.h @@ -34,9 +34,16 @@ typedef struct _GstOMXComponent GstOMXComponent; typedef struct _GstOMXBuffer GstOMXBuffer; typedef enum { + /* Everything good and the buffer is valid */ GST_OMX_ACQUIRE_BUFFER_OK = 0, + /* The port is flushing, exit ASAP */ GST_OMX_ACQUIRE_BUFFER_FLUSHING, + /* The port must be reconfigured */ GST_OMX_ACQUIRE_BUFFER_RECONFIGURE, + /* The port was reconfigured and the caps might have changed + * NOTE: This is only returned a single time! */ + GST_OMX_ACQUIRE_BUFFER_RECONFIGURED, + /* A fatal error happened */ GST_OMX_ACQUIRE_BUFFER_ERROR } GstOMXAcquireBufferReturn; @@ -64,13 +71,16 @@ struct _GstOMXPort { guint32 index; /* Protects port_def, buffers, pending_buffers, - * settings_changed, flushing, flushed, enabled_changed. + * settings_changed, flushing, flushed, enabled_changed + * and settings_cookie. * * Signalled if pending_buffers gets a * new buffer or flushing/flushed is set - * to TRUE or an error happens. Always - * check comp->last_error after being - * signalled! + * to TRUE or the port is enabled/disabled + * or the settings change or an error happens. + * + * Note: Always check comp->last_error before + * waiting and after being signalled! * * Note: flushed==TRUE implies flushing==TRUE! * @@ -82,12 +92,15 @@ struct _GstOMXPort { OMX_PARAM_PORTDEFINITIONTYPE port_def; GPtrArray *buffers; /* Contains GstOMXBuffer* */ GQueue *pending_buffers; /* Contains GstOMXBuffer* */ - /* If TRUE we need to get the new caps - * of this port */ + /* If TRUE we need to get the new caps of this port */ gboolean settings_changed; gboolean flushing; gboolean flushed; /* TRUE after OMX_CommandFlush was done */ gboolean enabled_changed; /* TRUE after OMX_Command{En,Dis}able was done */ + + /* If not equal to comp->settings_cookie we need + * to reconfigure this port */ + gint settings_cookie; }; struct _GstOMXComponent { @@ -96,8 +109,10 @@ struct _GstOMXComponent { GstOMXCore *core; GPtrArray *ports; /* Contains GstOMXPort* */ + gint n_in_ports, n_out_ports; - /* Protecting state, pending_state and last_error + /* Protecting state, pending_state, last_error + * and settings_cookie. * Signalled if one of them changes */ GMutex *state_lock; @@ -107,6 +122,14 @@ struct _GstOMXComponent { OMX_STATETYPE pending_state; /* OMX_ErrorNone usually, if different nothing will work */ OMX_ERRORTYPE last_error; + /* Updated whenever settings of any port are changing. + * We always reconfigure all ports */ + gint settings_cookie; + /* Number of output ports that must still be reconfigured. + * If any are pending no input port will be reconfigured + * or will accept any data and wait. + */ + gint reconfigure_out_pending; }; struct _GstOMXBuffer { @@ -117,6 +140,9 @@ struct _GstOMXBuffer { * between {Empty,Fill}ThisBuffer and the callback */ gboolean used; + + /* Cookie of the settings when this buffer was allocated */ + gint settings_cookie; }; GstOMXCore * gst_omx_core_acquire (const gchar * filename); @@ -135,6 +161,9 @@ OMX_ERRORTYPE gst_omx_component_get_last_error (GstOMXComponent * comp); GstOMXPort * gst_omx_component_add_port (GstOMXComponent * comp, guint32 index); GstOMXPort * gst_omx_component_get_port (GstOMXComponent * comp, guint32 index); +gint gst_omx_component_get_settings_cookie (GstOMXComponent * comp); +void gst_omx_component_trigger_settings_changed (GstOMXComponent * comp); + void gst_omx_port_get_port_definition (GstOMXPort * port, OMX_PARAM_PORTDEFINITIONTYPE * port_def); gboolean gst_omx_port_update_port_definition (GstOMXPort *port, OMX_PARAM_PORTDEFINITIONTYPE *port_definition); @@ -153,8 +182,6 @@ OMX_ERRORTYPE gst_omx_port_reconfigure (GstOMXPort * port); OMX_ERRORTYPE gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled); gboolean gst_omx_port_is_enabled (GstOMXPort * port); -gboolean gst_omx_port_is_settings_changed (GstOMXPort * port); - G_END_DECLS #endif /* __GST_OMX_H__ */ diff --git a/omx/gstomxvideodec.c b/omx/gstomxvideodec.c index 2378d94..376263c 100644 --- a/omx/gstomxvideodec.c +++ b/omx/gstomxvideodec.c @@ -307,13 +307,20 @@ gst_omx_video_dec_loop (GstOMXVideoDec * self) goto component_error; } else if (acq_return == GST_OMX_ACQUIRE_BUFFER_FLUSHING) { goto flushing; + } else if (acq_return == GST_OMX_ACQUIRE_BUFFER_RECONFIGURE) { + if (gst_omx_port_reconfigure (self->out_port) != OMX_ErrorNone) + goto reconfigure_error; + /* And restart the loop */ + return; } if (!GST_PAD_CAPS (GST_BASE_VIDEO_CODEC_SRC_PAD (self)) - || acq_return == GST_OMX_ACQUIRE_BUFFER_RECONFIGURE) { + || acq_return == GST_OMX_ACQUIRE_BUFFER_RECONFIGURED) { GstVideoState *state = &GST_BASE_VIDEO_CODEC (self)->state; OMX_PARAM_PORTDEFINITIONTYPE port_def; + GST_DEBUG_OBJECT (self, "Port settings have changed, updating caps"); + gst_omx_port_get_port_definition (port, &port_def); g_assert (port_def.format.video.eCompressionFormat == OMX_VIDEO_CodingUnused); @@ -331,13 +338,15 @@ gst_omx_video_dec_loop (GstOMXVideoDec * self) state->height = port_def.format.video.nFrameHeight; /* FIXME XXX: Bellagio does not set this to something useful... */ /* gst_util_double_to_fraction (port_def.format.video.xFramerate / ((gdouble) 0xffff), &state->fps_n, &state->fps_d); */ - gst_base_video_decoder_set_src_caps (GST_BASE_VIDEO_DECODER (self)); + if (!gst_base_video_decoder_set_src_caps (GST_BASE_VIDEO_DECODER (self))) { + if (buf) + gst_omx_port_release_buffer (self->out_port, buf); + goto caps_failed; + } - if (acq_return == GST_OMX_ACQUIRE_BUFFER_RECONFIGURE) { - if (gst_omx_port_reconfigure (self->out_port) != OMX_ErrorNone) - goto reconfigure_error; + /* Now get a buffer */ + if (acq_return != GST_OMX_ACQUIRE_BUFFER_OK) return; - } } g_assert (acq_return == GST_OMX_ACQUIRE_BUFFER_OK && buf != NULL); @@ -349,8 +358,16 @@ gst_omx_video_dec_loop (GstOMXVideoDec * self) if (!frame) { GST_ERROR_OBJECT (self, "No corresponding frame found"); } else if (buf->omx_buf->nFilledLen > 0) { - if (gst_base_video_decoder_alloc_src_frame (GST_BASE_VIDEO_DECODER (self), - frame) == GST_FLOW_OK) { + if (GST_BASE_VIDEO_CODEC (self)->state.bytes_per_picture == 0) { + /* FIXME: If the sinkpad caps change we have currently no way + * to allocate new src buffers because basevideodecoder assumes + * that the caps on both pads are equivalent all the time + */ + GST_WARNING_OBJECT (self, + "Caps change pending and still have buffers for old caps -- dropping"); + } else + if (gst_base_video_decoder_alloc_src_frame (GST_BASE_VIDEO_DECODER + (self), frame) == GST_FLOW_OK) { /* FIXME: This currently happens because of a race condition too. * We first need to reconfigure the output port and then the input * port if both need reconfiguration. @@ -442,6 +459,14 @@ invalid_frame_size: gst_pad_pause_task (GST_BASE_VIDEO_CODEC_SRC_PAD (self)); return; } +caps_failed: + { + GST_ELEMENT_ERROR (self, LIBRARY, SETTINGS, (NULL), ("Failed to set caps")); + gst_pad_push_event (GST_BASE_VIDEO_CODEC_SRC_PAD (self), + gst_event_new_eos ()); + gst_pad_pause_task (GST_BASE_VIDEO_CODEC_SRC_PAD (self)); + return; + } } static gboolean @@ -550,6 +575,7 @@ gst_omx_video_dec_set_format (GstBaseVideoDecoder * decoder, if (needs_disable) { if (gst_omx_port_set_enabled (self->in_port, TRUE) != OMX_ErrorNone) return FALSE; + gst_omx_component_trigger_settings_changed (self->component); } else { if (gst_omx_component_set_state (self->component, OMX_StateIdle) != OMX_ErrorNone) @@ -650,6 +676,9 @@ gst_omx_video_dec_handle_frame (GstBaseVideoDecoder * decoder, goto reconfigure_error; /* Now get a new buffer and fill it */ continue; + } else if (acq_ret == GST_OMX_ACQUIRE_BUFFER_RECONFIGURED) { + /* TODO: Anything to do here? Don't think so */ + continue; } g_assert (acq_ret == GST_OMX_ACQUIRE_BUFFER_OK && buf != NULL); -- 2.7.4