/*
* Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
* Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
+ * Copyright (C) 2013, Collabora Ltd.
+ * Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
G_UNLOCK (core_handles);
}
+/* NOTE: comp->messages_lock will be used */
+static void
+gst_omx_component_flush_messages (GstOMXComponent * comp)
+{
+ GstOMXMessage *msg;
+
+ g_mutex_lock (&comp->messages_lock);
+ while ((msg = g_queue_pop_head (&comp->messages))) {
+ g_slice_free (GstOMXMessage, msg);
+ }
+ g_mutex_unlock (&comp->messages_lock);
+}
+
+/* NOTE: Call with comp->lock, comp->messages_lock will be used */
+static void
+gst_omx_component_handle_messages (GstOMXComponent * comp)
+{
+ GstOMXMessage *msg;
+
+ g_mutex_lock (&comp->messages_lock);
+
+ while ((msg = g_queue_pop_head (&comp->messages))) {
+ switch (msg->type) {
+ case GST_OMX_MESSAGE_STATE_SET:{
+ GST_DEBUG_OBJECT (comp->parent, "State change to %d finished",
+ msg->content.state_set.state);
+ comp->state = msg->content.state_set.state;
+ if (comp->state == comp->pending_state)
+ comp->pending_state = OMX_StateInvalid;
+ break;
+ }
+ case GST_OMX_MESSAGE_FLUSH:{
+ GstOMXPort *port = NULL;
+ OMX_U32 index = msg->content.flush.port;
+
+ port = gst_omx_component_get_port (comp, index);
+ if (!port)
+ break;
+
+ GST_DEBUG_OBJECT (comp->parent, "Port %u flushed", port->index);
+
+ if (port->flushing) {
+ port->flushed = TRUE;
+ } else {
+ GST_ERROR_OBJECT (comp->parent, "Port %u was not flushing",
+ port->index);
+ }
+
+ break;
+ }
+ case GST_OMX_MESSAGE_ERROR:{
+ OMX_ERRORTYPE error = msg->content.error.error;
+
+ if (error == OMX_ErrorNone)
+ break;
+
+ GST_ERROR_OBJECT (comp->parent, "Got error: %s (0x%08x)",
+ gst_omx_error_to_string (error), error);
+
+ /* We only set the first error ever from which
+ * we can't recover anymore.
+ */
+ if (comp->last_error == OMX_ErrorNone)
+ comp->last_error = error;
+ g_cond_broadcast (&comp->messages_cond);
+
+ break;
+ }
+ case GST_OMX_MESSAGE_PORT_ENABLE:{
+ GstOMXPort *port = NULL;
+ OMX_U32 index = msg->content.port_enable.port;
+ OMX_BOOL enable = msg->content.port_enable.enable;
+
+ port = gst_omx_component_get_port (comp, index);
+ if (!port)
+ break;
+
+ GST_DEBUG_OBJECT (comp->parent, "Port %u %s", port->index,
+ (enable ? "enabled" : "disabled"));
+
+ port->enabled_changed = TRUE;
+ break;
+ }
+ case GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED:{
+ gint i, n;
+ OMX_U32 index = msg->content.port_settings_changed.port;
+ GList *outports = NULL, *l, *k;
+
+ GST_DEBUG_OBJECT (comp->parent, "Settings changed (port %u)", index);
+
+ /* FIXME: This probably can be done better */
+
+ /* Now update the ports' states */
+ n = (comp->ports ? comp->ports->len : 0);
+ for (i = 0; i < n; i++) {
+ GstOMXPort *port = g_ptr_array_index (comp->ports, i);
+
+ if (index == OMX_ALL || index == port->index) {
+ port->settings_cookie++;
+ if (port->port_def.eDir == OMX_DirOutput)
+ outports = g_list_prepend (outports, port);
+ }
+ }
+
+ for (k = outports; k; k = k->next) {
+ gboolean found = FALSE;
+
+ for (l = comp->pending_reconfigure_outports; l; l = l->next) {
+ if (l->data == k->data) {
+ found = TRUE;
+ break;
+ }
+ }
+
+ if (!found)
+ comp->pending_reconfigure_outports =
+ g_list_prepend (comp->pending_reconfigure_outports, k->data);
+ }
+
+ if (comp->pending_reconfigure_outports)
+ g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1);
+
+ g_list_free (outports);
+
+ break;
+ }
+ case GST_OMX_MESSAGE_BUFFER_DONE:{
+ GstOMXBuffer *buf = msg->content.buffer_done.buffer->pAppPrivate;
+ GstOMXPort *port;
+ GstOMXComponent *comp;
+
+ port = buf->port;
+ comp = port->comp;
+
+ if (msg->content.buffer_done.empty) {
+ /* Input buffer is empty again and can be used to contain new input */
+ GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)",
+ port->index, buf, buf->omx_buf->pBuffer);
+
+ /* XXX: Some OMX implementations don't reset nOffset
+ * when the complete buffer is emptied but instead
+ * only reset nFilledLen. We reset nOffset to 0
+ * if nFilledLen == 0, which is safe to do because
+ * the offset *must* be 0 if the buffer is not
+ * filled at all.
+ *
+ * Seen in QCOM's OMX implementation.
+ */
+ if (buf->omx_buf->nFilledLen == 0)
+ buf->omx_buf->nOffset = 0;
+
+ /* Reset all flags, some implementations don't
+ * reset them themselves and the flags are not
+ * valid anymore after the buffer was consumed
+ */
+ buf->omx_buf->nFlags = 0;
+ } else {
+ /* Output buffer contains output now or
+ * the port was flushed */
+ GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)",
+ port->index, buf, buf->omx_buf->pBuffer);
+ }
+
+ buf->used = FALSE;
+
+ g_queue_push_tail (&port->pending_buffers, buf);
+
+ break;
+ }
+ default:{
+ g_assert_not_reached ();
+ break;
+ }
+ }
+
+ g_slice_free (GstOMXMessage, msg);
+ }
+
+ g_mutex_unlock (&comp->messages_lock);
+}
+
static OMX_ERRORTYPE
EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
OMX_U32 nData1, OMX_U32 nData2, OMX_PTR pEventData)
switch (cmd) {
case OMX_CommandStateSet:{
- /* Notify everything that waits for
- * a state change to be finished */
+ GstOMXMessage *msg = g_slice_new (GstOMXMessage);
+
+ msg->type = GST_OMX_MESSAGE_STATE_SET;
+ msg->content.state_set.state = nData2;
+
GST_DEBUG_OBJECT (comp->parent, "State change to %d finished",
- nData2);
- gst_omx_rec_mutex_recursive_lock (&comp->state_lock);
- comp->state = (OMX_STATETYPE) nData2;
- if (comp->state == comp->pending_state)
- comp->pending_state = OMX_StateInvalid;
- g_cond_broadcast (&comp->state_cond);
- gst_omx_rec_mutex_recursive_unlock (&comp->state_lock);
+ msg->content.state_set.state);
+
+ g_mutex_lock (&comp->messages_lock);
+ g_queue_push_tail (&comp->messages, msg);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
break;
}
case OMX_CommandFlush:{
- GstOMXPort *port = NULL;
- OMX_U32 index = nData2;
-
- port = gst_omx_component_get_port (comp, index);
- if (!port)
- break;
+ GstOMXMessage *msg = g_slice_new (GstOMXMessage);
- GST_DEBUG_OBJECT (comp->parent, "Port %u flushed", port->index);
+ msg->type = GST_OMX_MESSAGE_FLUSH;
+ msg->content.flush.port = nData2;
+ GST_DEBUG_OBJECT (comp->parent, "Port %u flushed",
+ msg->content.flush.port);
- /* Now notify gst_omx_port_set_flushing()
- * that the port is really flushed now and
- * we can continue
- */
- gst_omx_rec_mutex_recursive_lock (&port->port_lock);
- /* FIXME: If this is ever called when the port
- * was not set to flushing something went
- * wrong but it happens for some reason.
- */
- if (port->flushing) {
- port->flushed = TRUE;
- g_cond_broadcast (&port->port_cond);
- } else {
- GST_ERROR_OBJECT (comp->parent, "Port %u was not flushing",
- port->index);
- }
- gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
+ g_mutex_lock (&comp->messages_lock);
+ g_queue_push_tail (&comp->messages, msg);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
break;
}
case OMX_CommandPortEnable:
case OMX_CommandPortDisable:{
- GstOMXPort *port = NULL;
- OMX_U32 index = nData2;
-
- port = gst_omx_component_get_port (comp, index);
- if (!port)
- break;
-
- GST_DEBUG_OBJECT (comp->parent, "Port %u %s", port->index,
- (cmd == OMX_CommandPortEnable ? "enabled" : "disabled"));
-
- gst_omx_rec_mutex_recursive_lock (&port->port_lock);
- port->enabled_changed = TRUE;
- g_cond_broadcast (&port->port_cond);
- gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
-
+ GstOMXMessage *msg = g_slice_new (GstOMXMessage);
+
+ msg->type = GST_OMX_MESSAGE_PORT_ENABLE;
+ msg->content.port_enable.port = nData2;
+ msg->content.port_enable.enable = (cmd == OMX_CommandPortEnable);
+ GST_DEBUG_OBJECT (comp->parent, "Port %u %s",
+ msg->content.port_enable.port,
+ (msg->content.port_enable.enable ? "enabled" : "disabled"));
+
+ g_mutex_lock (&comp->messages_lock);
+ g_queue_push_tail (&comp->messages, msg);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
break;
}
default:
}
case OMX_EventError:
{
- OMX_ERRORTYPE err = nData1;
+ GstOMXMessage *msg;
- if (err == OMX_ErrorNone)
+ /* Yes, this really happens... */
+ if (nData1 == OMX_ErrorNone)
break;
- GST_ERROR_OBJECT (comp->parent, "Got error: %s (0x%08x)",
- gst_omx_error_to_string (err), err);
+ msg = g_slice_new (GstOMXMessage);
- /* Error events are always fatal */
- gst_omx_component_set_last_error (comp, err);
+ msg->type = GST_OMX_MESSAGE_ERROR;
+ msg->content.error.error = nData1;
+ GST_ERROR_OBJECT (comp->parent, "Got error: %s (0x%08x)",
+ gst_omx_error_to_string (msg->content.error.error),
+ msg->content.error.error);
+ g_mutex_lock (&comp->messages_lock);
+ g_queue_push_tail (&comp->messages, msg);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
break;
}
case OMX_EventPortSettingsChanged:
{
- gint i, n;
- guint32 port_index;
- GList *outports = NULL, *l, *k;
+ GstOMXMessage *msg = g_slice_new (GstOMXMessage);
+ OMX_U32 index;
if (!(comp->hacks &
GST_OMX_HACK_EVENT_PORT_SETTINGS_CHANGED_NDATA_PARAMETER_SWAP)) {
- port_index = nData1;
+ index = nData1;
} else {
- port_index = nData2;
+ index = nData2;
}
- if (port_index == 0
+
+ if (index == 0
&& (comp->hacks &
GST_OMX_HACK_EVENT_PORT_SETTINGS_CHANGED_PORT_0_TO_1))
- port_index = 1;
+ index = 1;
- GST_DEBUG_OBJECT (comp->parent, "Settings changed (port index: %d)",
- port_index);
-
- /* Now update the ports' states */
- n = (comp->ports ? comp->ports->len : 0);
- for (i = 0; i < n; i++) {
- GstOMXPort *port = g_ptr_array_index (comp->ports, i);
-
- gst_omx_rec_mutex_recursive_lock (&port->port_lock);
- if (port_index == OMX_ALL || port_index == port->index) {
- port->settings_cookie++;
- if (port->port_def.eDir == OMX_DirOutput)
- outports = g_list_prepend (outports, port);
- g_cond_broadcast (&port->port_cond);
- }
- gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
- }
-
- gst_omx_rec_mutex_recursive_lock (&comp->state_lock);
- for (k = outports; k; k = k->next) {
- gboolean found = FALSE;
-
- for (l = comp->pending_reconfigure_outports; l; l = l->next) {
- if (l->data == k->data) {
- found = TRUE;
- break;
- }
- }
- if (!found)
- comp->pending_reconfigure_outports =
- g_list_prepend (comp->pending_reconfigure_outports, k->data);
- }
-
- if (comp->pending_reconfigure_outports)
- g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1);
- gst_omx_rec_mutex_recursive_unlock (&comp->state_lock);
+ msg->type = GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED;
+ msg->content.port_settings_changed.port = index;
+ GST_DEBUG_OBJECT (comp->parent, "Settings changed (port index: %d)",
+ msg->content.port_settings_changed.port);
- g_list_free (outports);
+ g_mutex_lock (&comp->messages_lock);
+ g_queue_push_tail (&comp->messages, msg);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
break;
}
EmptyBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData,
OMX_BUFFERHEADERTYPE * pBuffer)
{
- GstOMXBuffer *buf = pBuffer->pAppPrivate;
- GstOMXPort *port;
+ GstOMXBuffer *buf;
GstOMXComponent *comp;
+ GstOMXMessage *msg;
- if (buf == NULL) {
+ buf = pBuffer->pAppPrivate;
+ if (!buf) {
GST_ERROR ("Have unknown or deallocated buffer %p", pBuffer);
return OMX_ErrorNone;
}
- port = buf->port;
- comp = port->comp;
-
g_assert (buf->omx_buf == pBuffer);
- /* Input buffer is empty again and can
- * be used to contain new input */
- gst_omx_rec_mutex_recursive_lock (&port->port_lock);
- GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)",
- port->index, buf, buf->omx_buf->pBuffer);
- buf->used = FALSE;
-
- /* XXX: Some OMX implementations don't reset nOffset
- * when the complete buffer is emptied but instead
- * only reset nFilledLen. We reset nOffset to 0
- * if nFilledLen == 0, which is safe to do because
- * the offset *must* be 0 if the buffer is not
- * filled at all.
- *
- * Seen in QCOM's OMX implementation.
- */
- if (buf->omx_buf->nFilledLen == 0)
- buf->omx_buf->nOffset = 0;
+ comp = buf->port->comp;
- /* Reset all flags, some implementations don't
- * reset them themselves and the flags are not
- * valid anymore after the buffer was consumed
- */
- buf->omx_buf->nFlags = 0;
+ msg = g_slice_new (GstOMXMessage);
+ msg->type = GST_OMX_MESSAGE_BUFFER_DONE;
+ msg->content.buffer_done.component = hComponent;
+ msg->content.buffer_done.app_data = pAppData;
+ msg->content.buffer_done.buffer = pBuffer;
+ msg->content.buffer_done.empty = OMX_TRUE;
+
+ GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)",
+ buf->port->index, buf, buf->omx_buf->pBuffer);
- g_queue_push_tail (port->pending_buffers, buf);
- g_cond_broadcast (&port->port_cond);
- gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
+ g_mutex_lock (&comp->messages_lock);
+ g_queue_push_tail (&comp->messages, msg);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
return OMX_ErrorNone;
}
FillBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData,
OMX_BUFFERHEADERTYPE * pBuffer)
{
- GstOMXBuffer *buf = pBuffer->pAppPrivate;
- GstOMXPort *port;
+ GstOMXBuffer *buf;
GstOMXComponent *comp;
+ GstOMXMessage *msg;
- if (buf == NULL) {
+ buf = pBuffer->pAppPrivate;
+ if (!buf) {
GST_ERROR ("Have unknown or deallocated buffer %p", pBuffer);
return OMX_ErrorNone;
}
- port = buf->port;
- comp = port->comp;
-
g_assert (buf->omx_buf == pBuffer);
- /* Output buffer contains output now or
- * the port was flushed */
- gst_omx_rec_mutex_recursive_lock (&port->port_lock);
- GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)", port->index,
- buf, buf->omx_buf->pBuffer);
- buf->used = FALSE;
- g_queue_push_tail (port->pending_buffers, buf);
- g_cond_broadcast (&port->port_cond);
- gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
+ comp = buf->port->comp;
+
+ msg = g_slice_new (GstOMXMessage);
+ msg->type = GST_OMX_MESSAGE_BUFFER_DONE;
+ msg->content.buffer_done.component = hComponent;
+ msg->content.buffer_done.app_data = pAppData;
+ msg->content.buffer_done.buffer = pBuffer;
+ msg->content.buffer_done.empty = OMX_FALSE;
+
+ GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)",
+ buf->port->index, buf, buf->omx_buf->pBuffer);
+
+ g_mutex_lock (&comp->messages_lock);
+ g_queue_push_tail (&comp->messages, msg);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
return OMX_ErrorNone;
}
static OMX_CALLBACKTYPE callbacks =
{ EventHandler, EmptyBufferDone, FillBufferDone };
+/* NOTE: Uses comp->lock and comp->messages_lock */
GstOMXComponent *
gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata)
{
comp->n_in_ports = 0;
comp->n_out_ports = 0;
- gst_omx_rec_mutex_init (&comp->state_lock);
- g_cond_init (&comp->state_cond);
+ g_mutex_init (&comp->lock);
+ g_mutex_init (&comp->messages_lock);
+ g_cond_init (&comp->messages_cond);
+
+ g_queue_init (&comp->messages);
comp->pending_state = OMX_StateInvalid;
comp->last_error = OMX_ErrorNone;
OMX_GetState (comp->handle, &comp->state);
+ g_mutex_lock (&comp->lock);
+ gst_omx_component_handle_messages (comp);
+ g_mutex_unlock (&comp->lock);
+
return comp;
}
+/* NOTE: Uses comp->messages_lock */
void
gst_omx_component_free (GstOMXComponent * comp)
{
GstOMXPort *port = g_ptr_array_index (comp->ports, i);
gst_omx_port_deallocate_buffers (port);
-
- gst_omx_rec_mutex_clear (&port->port_lock);
- g_cond_clear (&port->port_cond);
- g_queue_free (port->pending_buffers);
+ g_assert (port->buffers == NULL);
+ g_assert (g_queue_get_length (&port->pending_buffers) == 0);
g_slice_free (GstOMXPort, port);
}
-#if GLIB_CHECK_VERSION(2,22,0)
g_ptr_array_unref (comp->ports);
-#else
- g_ptr_array_free (comp->ports, TRUE);
-#endif
comp->ports = NULL;
}
comp->core->free_handle (comp->handle);
gst_omx_core_release (comp->core);
- g_cond_clear (&comp->state_cond);
- gst_omx_rec_mutex_clear (&comp->state_lock);
+ gst_omx_component_flush_messages (comp);
+
+ g_cond_clear (&comp->messages_cond);
+ g_mutex_clear (&comp->messages_lock);
+ g_mutex_clear (&comp->lock);
gst_object_unref (comp->parent);
g_slice_free (GstOMXComponent, comp);
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state)
{
g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined);
- gst_omx_rec_mutex_lock_for_recursion (&comp->state_lock);
+ g_mutex_lock (&comp->lock);
+
+ gst_omx_component_handle_messages (comp);
+
old_state = comp->state;
GST_DEBUG_OBJECT (comp->parent, "Setting state from %d to %d", old_state,
state);
+
if ((err = comp->last_error) != OMX_ErrorNone && state > old_state) {
GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
g_list_free (comp->pending_reconfigure_outports);
comp->pending_reconfigure_outports = NULL;
/* Notify all inports that are still waiting */
- g_cond_broadcast (&comp->state_cond);
+ g_mutex_lock (&comp->messages_lock);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
}
- gst_omx_rec_mutex_begin_recursion (&comp->state_lock);
err = OMX_SendCommand (comp->handle, OMX_CommandStateSet, state, NULL);
- gst_omx_rec_mutex_end_recursion (&comp->state_lock);
/* No need to check if anything has changed here */
done:
- gst_omx_rec_mutex_unlock_for_recursion (&comp->state_lock);
+
+ gst_omx_component_handle_messages (comp);
+ g_mutex_unlock (&comp->lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
return err;
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_STATETYPE
gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout)
{
GST_DEBUG_OBJECT (comp->parent, "Getting state");
- gst_omx_rec_mutex_lock (&comp->state_lock);
+ g_mutex_lock (&comp->lock);
+
+ gst_omx_component_handle_messages (comp);
+
ret = comp->state;
if (comp->pending_state == OMX_StateInvalid)
goto done;
goto done;
wait_until = g_get_monotonic_time () + add;
- GST_DEBUG_OBJECT (comp->parent, "Waiting for %ld us", add);
+ GST_DEBUG_OBJECT (comp->parent, "Waiting for %" G_GINT64_FORMAT "us", add);
} else {
GST_DEBUG_OBJECT (comp->parent, "Waiting for signal");
}
- do {
+ gst_omx_component_handle_messages (comp);
+ while (signalled && comp->last_error == OMX_ErrorNone
+ && comp->pending_state != OMX_StateInvalid) {
+ g_mutex_lock (&comp->messages_lock);
+ g_mutex_unlock (&comp->lock);
+ if (!g_queue_is_empty (&comp->messages)) {
+ signalled = TRUE;
+ }
if (wait_until == -1) {
- g_cond_wait (&comp->state_cond, &comp->state_lock.lock);
+ g_cond_wait (&comp->messages_cond, &comp->messages_lock);
signalled = TRUE;
} else {
signalled =
- g_cond_wait_until (&comp->state_cond, &comp->state_lock.lock,
+ g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
wait_until);
}
- } while (signalled && comp->last_error == OMX_ErrorNone
- && comp->pending_state != OMX_StateInvalid);
+ g_mutex_unlock (&comp->messages_lock);
+ g_mutex_lock (&comp->lock);
+ if (signalled)
+ gst_omx_component_handle_messages (comp);
+ };
if (signalled) {
if (comp->last_error != OMX_ErrorNone) {
}
done:
- gst_omx_rec_mutex_unlock (&comp->state_lock);
+ g_mutex_unlock (&comp->lock);
/* If we waited and timed out this component is unusable now */
if (!signalled)
port->port_def = port_def;
- gst_omx_rec_mutex_init (&port->port_lock);
- g_cond_init (&port->port_cond);
- port->pending_buffers = g_queue_new ();
+ g_queue_init (&port->pending_buffers);
port->flushing = TRUE;
port->flushed = FALSE;
port->settings_changed = FALSE;
{
gint i, n;
- /* No need for locking here because the
- * ports are all added directly after
- * creating the component and are removed
- * when the component is destroyed.
- */
-
n = comp->ports->len;
for (i = 0; i < n; i++) {
GstOMXPort *tmp = g_ptr_array_index (comp->ports, i);
}
}
-/* NOTE: This takes comp->state_lock *and* *all* port->port_lock! */
+/* NOTE: Uses comp->lock and comp->messages_lock */
void
gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err)
{
- gint i, n;
-
g_return_if_fail (comp != NULL);
if (err == OMX_ErrorNone)
return;
+ g_mutex_lock (&comp->lock);
GST_ERROR_OBJECT (comp->parent, "Setting last error: %s (0x%08x)",
gst_omx_error_to_string (err), err);
- gst_omx_rec_mutex_recursive_lock (&comp->state_lock);
/* We only set the first error ever from which
* we can't recover anymore.
*/
if (comp->last_error == OMX_ErrorNone)
comp->last_error = err;
- g_cond_broadcast (&comp->state_cond);
- gst_omx_rec_mutex_recursive_unlock (&comp->state_lock);
-
- /* Now notify all ports, no locking needed
- * here because the ports are allocated in the
- * very beginning and never change again until
- * component destruction.
- */
- n = (comp->ports ? comp->ports->len : 0);
- for (i = 0; i < n; i++) {
- GstOMXPort *tmp = g_ptr_array_index (comp->ports, i);
+ g_mutex_unlock (&comp->lock);
- gst_omx_rec_mutex_recursive_lock (&tmp->port_lock);
- g_cond_broadcast (&tmp->port_cond);
- gst_omx_rec_mutex_recursive_unlock (&tmp->port_lock);
- }
+ g_mutex_lock (&comp->messages_lock);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_component_get_last_error (GstOMXComponent * comp)
{
g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined);
- gst_omx_rec_mutex_lock (&comp->state_lock);
+ g_mutex_lock (&comp->lock);
+ gst_omx_component_handle_messages (comp);
err = comp->last_error;
- gst_omx_rec_mutex_unlock (&comp->state_lock);
+ g_mutex_unlock (&comp->lock);
GST_DEBUG_OBJECT (comp->parent, "Returning last error: %s (0x%08x)",
gst_omx_error_to_string (err), err);
return gst_omx_error_to_string (gst_omx_component_get_last_error (comp));
}
+/* comp->lock must be unlocked while calling this */
OMX_ERRORTYPE
gst_omx_component_get_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index,
gpointer param)
return err;
}
+/* comp->lock must be unlocked while calling this */
OMX_ERRORTYPE
gst_omx_component_set_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index,
gpointer param)
return err;
}
+/* comp->lock must be unlocked while calling this */
OMX_ERRORTYPE
gst_omx_component_get_config (GstOMXComponent * comp, OMX_INDEXTYPE index,
gpointer config)
return err;
}
+/* comp->lock must be unlocked while calling this */
OMX_ERRORTYPE
gst_omx_component_set_config (GstOMXComponent * comp, OMX_INDEXTYPE index,
gpointer config)
comp = port->comp;
- gst_omx_rec_mutex_lock (&port->port_lock);
if (port_def)
err =
gst_omx_component_set_parameter (comp, OMX_IndexParamPortDefinition,
GST_DEBUG_OBJECT (comp->parent, "Updated port %u definition: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);
- gst_omx_rec_mutex_unlock (&port->port_lock);
-
return (err == OMX_ErrorNone);
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
GstOMXAcquireBufferReturn
gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf)
{
comp = port->comp;
+ g_mutex_lock (&comp->lock);
GST_DEBUG_OBJECT (comp->parent, "Acquiring buffer from port %u", port->index);
- gst_omx_rec_mutex_lock (&port->port_lock);
-
retry:
+ gst_omx_component_handle_messages (comp);
/* Check if the component is in an error state */
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+ if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s",
gst_omx_error_to_string (err));
ret = GST_OMX_ACQUIRE_BUFFER_ERROR;
*/
if (port->port_def.eDir == OMX_DirInput) {
if (g_atomic_int_get (&comp->have_pending_reconfigure_outports)) {
- gst_omx_rec_mutex_unlock (&port->port_lock);
- gst_omx_rec_mutex_lock (&comp->state_lock);
+ gst_omx_component_handle_messages (comp);
while (g_atomic_int_get (&comp->have_pending_reconfigure_outports) &&
(err = comp->last_error) == OMX_ErrorNone && !port->flushing) {
GST_DEBUG_OBJECT (comp->parent,
"Waiting for output ports to reconfigure");
- g_cond_wait (&comp->state_cond, &comp->state_lock.lock);
+ g_mutex_lock (&comp->messages_lock);
+ g_mutex_unlock (&comp->lock);
+ if (g_queue_is_empty (&comp->messages))
+ g_cond_wait (&comp->messages_cond, &comp->messages_lock);
+ g_mutex_unlock (&comp->messages_lock);
+ g_mutex_lock (&comp->lock);
+ gst_omx_component_handle_messages (comp);
}
- gst_omx_rec_mutex_unlock (&comp->state_lock);
- gst_omx_rec_mutex_lock (&port->port_lock);
goto retry;
}
* we have to drop them... */
if (port->port_def.eDir == OMX_DirOutput &&
port->settings_cookie != port->configured_settings_cookie) {
- if (!g_queue_is_empty (port->pending_buffers)) {
+ if (!g_queue_is_empty (&port->pending_buffers)) {
GST_DEBUG_OBJECT (comp->parent,
"Output port %u needs reconfiguration but has buffers pending",
port->index);
- _buf = g_queue_pop_head (port->pending_buffers);
+ _buf = g_queue_pop_head (&port->pending_buffers);
ret = GST_OMX_ACQUIRE_BUFFER_OK;
goto done;
* arrives, an error happens, the port is flushing
* or the port needs to be reconfigured.
*/
- if (g_queue_is_empty (port->pending_buffers)) {
+ gst_omx_component_handle_messages (comp);
+ if (g_queue_is_empty (&port->pending_buffers)) {
GST_DEBUG_OBJECT (comp->parent, "Queue of port %u is empty", port->index);
- g_cond_wait (&port->port_cond, &port->port_lock.lock);
+ g_mutex_lock (&comp->messages_lock);
+ g_mutex_unlock (&comp->lock);
+ if (g_queue_is_empty (&comp->messages))
+ g_cond_wait (&comp->messages_cond, &comp->messages_lock);
+ g_mutex_unlock (&comp->messages_lock);
+ g_mutex_lock (&comp->lock);
+ gst_omx_component_handle_messages (comp);
+
+ /* And now check everything again and maybe get a buffer */
+ goto retry;
} else {
GST_DEBUG_OBJECT (comp->parent, "Port %u has pending buffers", port->index);
- _buf = g_queue_pop_head (port->pending_buffers);
+ _buf = g_queue_pop_head (&port->pending_buffers);
ret = GST_OMX_ACQUIRE_BUFFER_OK;
goto done;
}
- /* And now check everything again and maybe get a buffer */
+ g_assert_not_reached ();
goto retry;
done:
- gst_omx_rec_mutex_unlock (&port->port_lock);
+ g_mutex_unlock (&comp->lock);
if (_buf) {
g_assert (_buf == _buf->omx_buf->pAppPrivate);
return ret;
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf)
{
comp = port->comp;
+ g_mutex_lock (&comp->lock);
+
GST_DEBUG_OBJECT (comp->parent, "Releasing buffer %p (%p) to port %u",
buf, buf->omx_buf->pBuffer, port->index);
- gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+ gst_omx_component_handle_messages (comp);
if (port->port_def.eDir == OMX_DirInput) {
/* Reset all flags, some implementations don't
buf->omx_buf->nFlags = 0;
}
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+ if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
- g_queue_push_tail (port->pending_buffers, buf);
- g_cond_broadcast (&port->port_cond);
+ g_queue_push_tail (&port->pending_buffers, buf);
+ g_mutex_lock (&comp->messages_lock);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_lock (&comp->messages_lock);
goto done;
}
if (port->flushing) {
GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing, not releasing buffer",
port->index);
- g_queue_push_tail (port->pending_buffers, buf);
- g_cond_broadcast (&port->port_cond);
+ g_queue_push_tail (&port->pending_buffers, buf);
+ g_mutex_lock (&comp->messages_lock);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
goto done;
}
buf->used = TRUE;
- gst_omx_rec_mutex_begin_recursion (&port->port_lock);
if (port->port_def.eDir == OMX_DirInput) {
err = OMX_EmptyThisBuffer (comp->handle, buf->omx_buf);
} else {
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
}
- gst_omx_rec_mutex_end_recursion (&port->port_lock);
-
GST_DEBUG_OBJECT (comp->parent, "Released buffer %p to port %u: %s (0x%08x)",
buf, port->index, gst_omx_error_to_string (err), err);
done:
- gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+ gst_omx_component_handle_messages (comp);
+ g_mutex_unlock (&comp->lock);
+
+ if (err != OMX_ErrorNone)
+ gst_omx_component_set_last_error (comp, err);
return err;
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
{
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
comp = port->comp;
+
+ g_mutex_lock (&comp->lock);
+
GST_DEBUG_OBJECT (comp->parent, "Setting port %d to %sflushing",
port->index, (flush ? "" : "not "));
- gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+ gst_omx_component_handle_messages (comp);
+
if (! !flush == ! !port->flushing) {
GST_DEBUG_OBJECT (comp->parent, "Port %u was %sflushing already",
port->index, (flush ? "" : "not "));
goto done;
}
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+ if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
goto done;
}
- gst_omx_rec_mutex_lock (&comp->state_lock);
if (comp->state != OMX_StateIdle && comp->state != OMX_StateExecuting) {
GST_DEBUG_OBJECT (comp->parent, "Component is in wrong state: %d",
comp->state);
err = OMX_ErrorUndefined;
- gst_omx_rec_mutex_unlock (&comp->state_lock);
goto done;
}
- gst_omx_rec_mutex_unlock (&comp->state_lock);
port->flushing = flush;
if (flush) {
gboolean signalled;
OMX_ERRORTYPE last_error;
- g_cond_broadcast (&port->port_cond);
-
- /* We also need to signal the state cond because
- * an input port might wait on this for the output
- * ports to reconfigure. This will not confuse
- * other waiters on the state cond because they will
- * additionally check if the condition they're waiting
- * for is true after waking up.
- */
- gst_omx_rec_mutex_lock (&comp->state_lock);
- g_cond_broadcast (&comp->state_cond);
- gst_omx_rec_mutex_unlock (&comp->state_lock);
+ g_mutex_lock (&comp->messages_lock);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
/* Now flush the port */
port->flushed = FALSE;
- gst_omx_rec_mutex_begin_recursion (&port->port_lock);
err = OMX_SendCommand (comp->handle, OMX_CommandFlush, port->index, NULL);
- gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
goto done;
}
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+ if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
"Component is in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
* the flush command completed */
signalled = TRUE;
last_error = OMX_ErrorNone;
+ gst_omx_component_handle_messages (comp);
while (signalled && last_error == OMX_ErrorNone && !port->flushed
- && port->buffers->len > g_queue_get_length (port->pending_buffers)) {
- signalled =
- g_cond_wait_until (&comp->state_cond, &comp->state_lock.lock,
- wait_until);
-
- last_error = gst_omx_component_get_last_error (comp);
+ && port->buffers->len > g_queue_get_length (&port->pending_buffers)) {
+ g_mutex_lock (&comp->messages_lock);
+ g_mutex_unlock (&comp->lock);
+ if (!g_queue_is_empty (&comp->messages))
+ signalled = TRUE;
+ else
+ signalled =
+ g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
+ wait_until);
+ g_mutex_unlock (&comp->messages_lock);
+ g_mutex_lock (&comp->lock);
+
+ if (signalled)
+ gst_omx_component_handle_messages (comp);
+
+ last_error = comp->last_error;
}
port->flushed = FALSE;
GstOMXBuffer *buf;
/* Enqueue all buffers for the component to fill */
- while ((buf = g_queue_pop_head (port->pending_buffers))) {
+ while ((buf = g_queue_pop_head (&port->pending_buffers))) {
if (!buf)
continue;
*/
buf->omx_buf->nFlags = 0;
- gst_omx_rec_mutex_begin_recursion (&port->port_lock);
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
- gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
done:
GST_DEBUG_OBJECT (comp->parent, "Set port %u to %sflushing: %s (0x%08x)",
port->index, (flush ? "" : "not "), gst_omx_error_to_string (err), err);
- gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+ gst_omx_component_handle_messages (comp);
+ g_mutex_unlock (&comp->lock);
return err;
error:
{
- /* Need to unlock the port lock here because
- * set_last_error() needs all port locks.
- * This is safe here because we're just going
- * to error out anyway */
- gst_omx_rec_mutex_unlock (&port->port_lock);
+ g_mutex_unlock (&comp->lock);
gst_omx_component_set_last_error (comp, err);
- gst_omx_rec_mutex_lock (&port->port_lock);
+ g_mutex_lock (&comp->lock);
goto done;
}
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
gboolean
gst_omx_port_is_flushing (GstOMXPort * port)
{
comp = port->comp;
- gst_omx_rec_mutex_lock (&port->port_lock);
+ g_mutex_lock (&comp->lock);
+ gst_omx_component_handle_messages (port->comp);
flushing = port->flushing;
- gst_omx_rec_mutex_unlock (&port->port_lock);
+ g_mutex_unlock (&comp->lock);
GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing: %d", port->index,
flushing);
return flushing;
}
-/* Must be called while holding port->lock */
+/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */
static OMX_ERRORTYPE
gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port)
{
comp = port->comp;
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+ gst_omx_component_handle_messages (port->comp);
+ if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
goto done;
buf->settings_cookie = port->settings_cookie;
g_ptr_array_add (port->buffers, buf);
- gst_omx_rec_mutex_begin_recursion (&port->port_lock);
err =
OMX_AllocateBuffer (comp->handle, &buf->omx_buf, port->index, buf,
port->port_def.nBufferSize);
- gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
g_assert (buf->omx_buf->pAppPrivate == buf);
/* In the beginning all buffers are not owned by the component */
- g_queue_push_tail (port->pending_buffers, buf);
+ g_queue_push_tail (&port->pending_buffers, buf);
}
+ gst_omx_component_handle_messages (comp);
+
done:
GST_DEBUG_OBJECT (comp->parent, "Allocated buffers for port %u: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);
error:
{
- /* Need to unlock the port lock here because
- * set_last_error() needs all port locks.
- * This is safe here because we're just going
- * to error out anyway */
- gst_omx_rec_mutex_unlock (&port->port_lock);
+ g_mutex_unlock (&comp->lock);
gst_omx_component_set_last_error (comp, err);
- gst_omx_rec_mutex_lock (&port->port_lock);
+ g_mutex_lock (&comp->lock);
goto done;
}
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_allocate_buffers (GstOMXPort * port)
{
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
- gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+ g_mutex_lock (&port->comp->lock);
err = gst_omx_port_allocate_buffers_unlocked (port);
- gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+ g_mutex_unlock (&port->comp->lock);
return err;
}
-/* Must be called while holding port->lock */
+/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */
static OMX_ERRORTYPE
gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port)
{
GST_DEBUG_OBJECT (comp->parent, "Deallocating buffers of port %u",
port->index);
+ gst_omx_component_handle_messages (port->comp);
+
if (!port->buffers) {
GST_DEBUG_OBJECT (comp->parent, "No buffers allocated for port %u",
port->index);
goto done;
}
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+ if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
/* We still try to deallocate all buffers */
GST_DEBUG_OBJECT (comp->parent, "Deallocating buffer %p (%p)", buf,
buf->omx_buf->pBuffer);
- gst_omx_rec_mutex_begin_recursion (&port->port_lock);
tmp = OMX_FreeBuffer (comp->handle, port->index, buf->omx_buf);
- gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (tmp != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
}
g_slice_free (GstOMXBuffer, buf);
}
-
- g_queue_clear (port->pending_buffers);
-#if GLIB_CHECK_VERSION(2,22,0)
+ g_queue_clear (&port->pending_buffers);
g_ptr_array_unref (port->buffers);
-#else
- g_ptr_array_free (port->buffers, TRUE);
-#endif
port->buffers = NULL;
+ gst_omx_component_handle_messages (comp);
+
done:
GST_DEBUG_OBJECT (comp->parent, "Deallocated buffers of port %u: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);
return err;
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_deallocate_buffers (GstOMXPort * port)
{
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
- gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+ g_mutex_lock (&port->comp->lock);
err = gst_omx_port_deallocate_buffers_unlocked (port);
- gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+ g_mutex_unlock (&port->comp->lock);
return err;
}
-/* Must be called while holding port->lock */
+/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */
static OMX_ERRORTYPE
gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
{
comp = port->comp;
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+ gst_omx_component_handle_messages (comp);
+
+ if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
goto done;
port->flushing = TRUE;
}
- gst_omx_rec_mutex_begin_recursion (&port->port_lock);
if (enabled)
err =
OMX_SendCommand (comp->handle, OMX_CommandPortEnable, port->index,
err =
OMX_SendCommand (comp->handle, OMX_CommandPortDisable,
port->index, NULL);
- gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
goto error;
}
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+ if ((err = comp->last_error) != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
gst_omx_error_to_string (err), err);
goto done;
/* First wait until all buffers are released by the port */
signalled = TRUE;
last_error = OMX_ErrorNone;
+ gst_omx_component_handle_messages (comp);
while (signalled && last_error == OMX_ErrorNone && (port->buffers
- && port->buffers->len > g_queue_get_length (port->pending_buffers))) {
- signalled =
- g_cond_wait_until (&port->port_cond, &port->port_lock.lock, wait_until);
- last_error = gst_omx_component_get_last_error (comp);
+ && port->buffers->len >
+ g_queue_get_length (&port->pending_buffers))) {
+ g_mutex_lock (&comp->messages_lock);
+ g_mutex_unlock (&comp->lock);
+ if (!g_queue_is_empty (&comp->messages))
+ signalled = TRUE;
+ else
+ signalled =
+ g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
+ wait_until);
+ g_mutex_unlock (&comp->messages_lock);
+ g_mutex_lock (&comp->lock);
+ if (signalled)
+ gst_omx_component_handle_messages (comp);
+ last_error = comp->last_error;
}
if (last_error != OMX_ErrorNone) {
last_error = OMX_ErrorNone;
gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition,
&port->port_def);
+ gst_omx_component_handle_messages (comp);
while (signalled && last_error == OMX_ErrorNone
&& (! !port->port_def.bEnabled != ! !enabled || !port->enabled_changed)) {
- signalled =
- g_cond_wait_until (&port->port_cond, &port->port_lock.lock, wait_until);
- last_error = gst_omx_component_get_last_error (comp);
+ g_mutex_lock (&comp->messages_lock);
+ g_mutex_unlock (&comp->lock);
+ if (!g_queue_is_empty (&comp->messages))
+ signalled = TRUE;
+ else
+ signalled =
+ g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
+ wait_until);
+ g_mutex_unlock (&comp->messages_lock);
+ g_mutex_lock (&comp->lock);
+ if (signalled)
+ gst_omx_component_handle_messages (comp);
+ last_error = comp->last_error;
gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition,
&port->port_def);
}
+ port->enabled_changed = FALSE;
if (!signalled) {
GST_ERROR_OBJECT (comp->parent,
GstOMXBuffer *buf;
/* Enqueue all buffers for the component to fill */
- while ((buf = g_queue_pop_head (port->pending_buffers))) {
+ while ((buf = g_queue_pop_head (&port->pending_buffers))) {
if (!buf)
continue;
*/
buf->omx_buf->nFlags = 0;
- gst_omx_rec_mutex_begin_recursion (&port->port_lock);
err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
- gst_omx_rec_mutex_end_recursion (&port->port_lock);
if (err != OMX_ErrorNone) {
GST_ERROR_OBJECT (comp->parent,
}
}
+ gst_omx_component_handle_messages (comp);
+
done:
GST_DEBUG_OBJECT (comp->parent, "Port %u is %s%s: %s (0x%08x)", port->index,
(err == OMX_ErrorNone ? "" : "not "),
error:
{
- /* Need to unlock the port lock here because
- * set_last_error() needs all port locks.
- * This is safe here because we're just going
- * to error out anyway */
- gst_omx_rec_mutex_unlock (&port->port_lock);
+ g_mutex_unlock (&comp->lock);
gst_omx_component_set_last_error (comp, err);
- gst_omx_rec_mutex_lock (&port->port_lock);
+ g_mutex_lock (&comp->lock);
goto done;
}
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled)
{
g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
- gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+ g_mutex_lock (&port->comp->lock);
err = gst_omx_port_set_enabled_unlocked (port, enabled);
- gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+ g_mutex_unlock (&port->comp->lock);
return err;
}
comp = port->comp;
- gst_omx_rec_mutex_lock (&port->port_lock);
gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition,
&port->port_def);
- enabled = port->port_def.bEnabled;
- gst_omx_rec_mutex_unlock (&port->port_lock);
+ enabled = ! !port->port_def.bEnabled;
GST_DEBUG_OBJECT (comp->parent, "Port %u is enabled: %d", port->index,
enabled);
return enabled;
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_reconfigure (GstOMXPort * port)
{
comp = port->comp;
+ g_mutex_lock (&comp->lock);
GST_DEBUG_OBJECT (comp->parent, "Reconfiguring port %u", port->index);
- gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+ gst_omx_component_handle_messages (comp);
if (!port->settings_changed)
goto done;
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone)
+ if ((err = comp->last_error) != OMX_ErrorNone)
goto done;
/* Disable and enable the port. This already takes
if (port->port_def.eDir == OMX_DirOutput) {
GList *l;
- gst_omx_rec_mutex_lock (&comp->state_lock);
for (l = comp->pending_reconfigure_outports; l; l = l->next) {
if (l->data == (gpointer) port) {
comp->pending_reconfigure_outports =
}
if (!comp->pending_reconfigure_outports) {
g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0);
- g_cond_broadcast (&comp->state_cond);
+ g_mutex_lock (&comp->messages_lock);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
}
- gst_omx_rec_mutex_unlock (&comp->state_lock);
}
done:
GST_DEBUG_OBJECT (comp->parent, "Reconfigured port %u: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);
- gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+ g_mutex_unlock (&comp->lock);
return err;
}
+/* NOTE: Uses comp->lock and comp->messages_lock */
OMX_ERRORTYPE
gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
{
comp = port->comp;
+ g_mutex_lock (&comp->lock);
+
GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u %s",
port->index, (start ? "start" : "stsop"));
- gst_omx_rec_mutex_lock (&port->port_lock);
+ gst_omx_component_handle_messages (comp);
- if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone)
+ if ((err = comp->last_error) != OMX_ErrorNone)
goto done;
if (start)
GList *l;
if (start) {
- gst_omx_rec_mutex_lock (&comp->state_lock);
for (l = comp->pending_reconfigure_outports; l; l = l->next) {
if (l->data == (gpointer) port)
break;
g_list_prepend (comp->pending_reconfigure_outports, port);
g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1);
}
- gst_omx_rec_mutex_unlock (&comp->state_lock);
} else {
- gst_omx_rec_mutex_lock (&comp->state_lock);
for (l = comp->pending_reconfigure_outports; l; l = l->next) {
if (l->data == (gpointer) port) {
comp->pending_reconfigure_outports =
}
if (!comp->pending_reconfigure_outports) {
g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0);
- g_cond_broadcast (&comp->state_cond);
+ g_mutex_lock (&comp->messages_lock);
+ g_cond_broadcast (&comp->messages_cond);
+ g_mutex_unlock (&comp->messages_lock);
}
- gst_omx_rec_mutex_unlock (&comp->state_lock);
}
}
done:
- gst_omx_rec_mutex_unlock (&port->port_lock);
-
GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u: %s (0x%08x)",
port->index, gst_omx_error_to_string (err), err);
+ g_mutex_unlock (&comp->lock);
+
return err;
}