omx: Implement new approach for locking that should solve all deadlocks on RPi
authorSebastian Dröge <sebastian.droege@collabora.co.uk>
Thu, 10 Jan 2013 13:44:33 +0000 (14:44 +0100)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Mon, 14 Jan 2013 09:36:32 +0000 (10:36 +0100)
No mutex is locked while calling any OpenMAX functions anymore
and everything from the OpenMAX callbacks is inserted into a message
queue and handled from outside the callbacks.

Also there's only a single mutex and condition variable per component
now for handling anything from OpenMAX callbacks and a single mutex
for keeping our component/port state sane.

omx/Makefile.am
omx/gstomx.c
omx/gstomx.h
omx/gstomxaudioenc.c
omx/gstomxrecmutex.c [deleted file]
omx/gstomxrecmutex.h [deleted file]

index 0296128..181ce0e 100644 (file)
@@ -12,8 +12,7 @@ libgstopenmax_la_SOURCES = \
        gstomxmpeg4videoenc.c \
        gstomxh264enc.c \
        gstomxh263enc.c \
-       gstomxaacenc.c \
-       gstomxrecmutex.c
+       gstomxaacenc.c
 
 noinst_HEADERS = \
        gstomx.h \
@@ -27,8 +26,7 @@ noinst_HEADERS = \
        gstomxmpeg4videoenc.h \
        gstomxh264enc.h \
        gstomxh263enc.h \
-       gstomxaacenc.h \
-       gstomxrecmutex.h
+       gstomxaacenc.h
 
 libgstopenmax_la_CFLAGS = \
        -DGST_USE_UNSTABLE_API=1 \
index 1d65f7c..8c30fec 100644 (file)
@@ -1,6 +1,8 @@
 /*
  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
+ * Copyright (C) 2013, Collabora Ltd.
+ *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -174,6 +176,187 @@ gst_omx_core_release (GstOMXCore * core)
   G_UNLOCK (core_handles);
 }
 
+/* NOTE: comp->messages_lock will be used */
+static void
+gst_omx_component_flush_messages (GstOMXComponent * comp)
+{
+  GstOMXMessage *msg;
+
+  g_mutex_lock (&comp->messages_lock);
+  while ((msg = g_queue_pop_head (&comp->messages))) {
+    g_slice_free (GstOMXMessage, msg);
+  }
+  g_mutex_unlock (&comp->messages_lock);
+}
+
+/* NOTE: Call with comp->lock, comp->messages_lock will be used */
+static void
+gst_omx_component_handle_messages (GstOMXComponent * comp)
+{
+  GstOMXMessage *msg;
+
+  g_mutex_lock (&comp->messages_lock);
+
+  while ((msg = g_queue_pop_head (&comp->messages))) {
+    switch (msg->type) {
+      case GST_OMX_MESSAGE_STATE_SET:{
+        GST_DEBUG_OBJECT (comp->parent, "State change to %d finished",
+            msg->content.state_set.state);
+        comp->state = msg->content.state_set.state;
+        if (comp->state == comp->pending_state)
+          comp->pending_state = OMX_StateInvalid;
+        break;
+      }
+      case GST_OMX_MESSAGE_FLUSH:{
+        GstOMXPort *port = NULL;
+        OMX_U32 index = msg->content.flush.port;
+
+        port = gst_omx_component_get_port (comp, index);
+        if (!port)
+          break;
+
+        GST_DEBUG_OBJECT (comp->parent, "Port %u flushed", port->index);
+
+        if (port->flushing) {
+          port->flushed = TRUE;
+        } else {
+          GST_ERROR_OBJECT (comp->parent, "Port %u was not flushing",
+              port->index);
+        }
+
+        break;
+      }
+      case GST_OMX_MESSAGE_ERROR:{
+        OMX_ERRORTYPE error = msg->content.error.error;
+
+        if (error == OMX_ErrorNone)
+          break;
+
+        GST_ERROR_OBJECT (comp->parent, "Got error: %s (0x%08x)",
+            gst_omx_error_to_string (error), error);
+
+        /* We only set the first error ever from which
+         * we can't recover anymore.
+         */
+        if (comp->last_error == OMX_ErrorNone)
+          comp->last_error = error;
+        g_cond_broadcast (&comp->messages_cond);
+
+        break;
+      }
+      case GST_OMX_MESSAGE_PORT_ENABLE:{
+        GstOMXPort *port = NULL;
+        OMX_U32 index = msg->content.port_enable.port;
+        OMX_BOOL enable = msg->content.port_enable.enable;
+
+        port = gst_omx_component_get_port (comp, index);
+        if (!port)
+          break;
+
+        GST_DEBUG_OBJECT (comp->parent, "Port %u %s", port->index,
+            (enable ? "enabled" : "disabled"));
+
+        port->enabled_changed = TRUE;
+        break;
+      }
+      case GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED:{
+        gint i, n;
+        OMX_U32 index = msg->content.port_settings_changed.port;
+        GList *outports = NULL, *l, *k;
+
+        GST_DEBUG_OBJECT (comp->parent, "Settings changed (port %u)", index);
+
+        /* FIXME: This probably can be done better */
+
+        /* Now update the ports' states */
+        n = (comp->ports ? comp->ports->len : 0);
+        for (i = 0; i < n; i++) {
+          GstOMXPort *port = g_ptr_array_index (comp->ports, i);
+
+          if (index == OMX_ALL || index == port->index) {
+            port->settings_cookie++;
+            if (port->port_def.eDir == OMX_DirOutput)
+              outports = g_list_prepend (outports, port);
+          }
+        }
+
+        for (k = outports; k; k = k->next) {
+          gboolean found = FALSE;
+
+          for (l = comp->pending_reconfigure_outports; l; l = l->next) {
+            if (l->data == k->data) {
+              found = TRUE;
+              break;
+            }
+          }
+
+          if (!found)
+            comp->pending_reconfigure_outports =
+                g_list_prepend (comp->pending_reconfigure_outports, k->data);
+        }
+
+        if (comp->pending_reconfigure_outports)
+          g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1);
+
+        g_list_free (outports);
+
+        break;
+      }
+      case GST_OMX_MESSAGE_BUFFER_DONE:{
+        GstOMXBuffer *buf = msg->content.buffer_done.buffer->pAppPrivate;
+        GstOMXPort *port;
+        GstOMXComponent *comp;
+
+        port = buf->port;
+        comp = port->comp;
+
+        if (msg->content.buffer_done.empty) {
+          /* Input buffer is empty again and can be used to contain new input */
+          GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)",
+              port->index, buf, buf->omx_buf->pBuffer);
+
+          /* XXX: Some OMX implementations don't reset nOffset
+           * when the complete buffer is emptied but instead
+           * only reset nFilledLen. We reset nOffset to 0
+           * if nFilledLen == 0, which is safe to do because
+           * the offset *must* be 0 if the buffer is not
+           * filled at all.
+           *
+           * Seen in QCOM's OMX implementation.
+           */
+          if (buf->omx_buf->nFilledLen == 0)
+            buf->omx_buf->nOffset = 0;
+
+          /* Reset all flags, some implementations don't
+           * reset them themselves and the flags are not
+           * valid anymore after the buffer was consumed
+           */
+          buf->omx_buf->nFlags = 0;
+        } else {
+          /* Output buffer contains output now or
+           * the port was flushed */
+          GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)",
+              port->index, buf, buf->omx_buf->pBuffer);
+        }
+
+        buf->used = FALSE;
+
+        g_queue_push_tail (&port->pending_buffers, buf);
+
+        break;
+      }
+      default:{
+        g_assert_not_reached ();
+        break;
+      }
+    }
+
+    g_slice_free (GstOMXMessage, msg);
+  }
+
+  g_mutex_unlock (&comp->messages_lock);
+}
+
 static OMX_ERRORTYPE
 EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
     OMX_U32 nData1, OMX_U32 nData2, OMX_PTR pEventData)
@@ -189,64 +372,49 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
 
       switch (cmd) {
         case OMX_CommandStateSet:{
-          /* Notify everything that waits for
-           * a state change to be finished */
+          GstOMXMessage *msg = g_slice_new (GstOMXMessage);
+
+          msg->type = GST_OMX_MESSAGE_STATE_SET;
+          msg->content.state_set.state = nData2;
+
           GST_DEBUG_OBJECT (comp->parent, "State change to %d finished",
-              nData2);
-          gst_omx_rec_mutex_recursive_lock (&comp->state_lock);
-          comp->state = (OMX_STATETYPE) nData2;
-          if (comp->state == comp->pending_state)
-            comp->pending_state = OMX_StateInvalid;
-          g_cond_broadcast (&comp->state_cond);
-          gst_omx_rec_mutex_recursive_unlock (&comp->state_lock);
+              msg->content.state_set.state);
+
+          g_mutex_lock (&comp->messages_lock);
+          g_queue_push_tail (&comp->messages, msg);
+          g_cond_broadcast (&comp->messages_cond);
+          g_mutex_unlock (&comp->messages_lock);
           break;
         }
         case OMX_CommandFlush:{
-          GstOMXPort *port = NULL;
-          OMX_U32 index = nData2;
-
-          port = gst_omx_component_get_port (comp, index);
-          if (!port)
-            break;
+          GstOMXMessage *msg = g_slice_new (GstOMXMessage);
 
-          GST_DEBUG_OBJECT (comp->parent, "Port %u flushed", port->index);
+          msg->type = GST_OMX_MESSAGE_FLUSH;
+          msg->content.flush.port = nData2;
+          GST_DEBUG_OBJECT (comp->parent, "Port %u flushed",
+              msg->content.flush.port);
 
-          /* Now notify gst_omx_port_set_flushing()
-           * that the port is really flushed now and
-           * we can continue
-           */
-          gst_omx_rec_mutex_recursive_lock (&port->port_lock);
-          /* FIXME: If this is ever called when the port
-           * was not set to flushing something went
-           * wrong but it happens for some reason.
-           */
-          if (port->flushing) {
-            port->flushed = TRUE;
-            g_cond_broadcast (&port->port_cond);
-          } else {
-            GST_ERROR_OBJECT (comp->parent, "Port %u was not flushing",
-                port->index);
-          }
-          gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
+          g_mutex_lock (&comp->messages_lock);
+          g_queue_push_tail (&comp->messages, msg);
+          g_cond_broadcast (&comp->messages_cond);
+          g_mutex_unlock (&comp->messages_lock);
           break;
         }
         case OMX_CommandPortEnable:
         case OMX_CommandPortDisable:{
-          GstOMXPort *port = NULL;
-          OMX_U32 index = nData2;
-
-          port = gst_omx_component_get_port (comp, index);
-          if (!port)
-            break;
-
-          GST_DEBUG_OBJECT (comp->parent, "Port %u %s", port->index,
-              (cmd == OMX_CommandPortEnable ? "enabled" : "disabled"));
-
-          gst_omx_rec_mutex_recursive_lock (&port->port_lock);
-          port->enabled_changed = TRUE;
-          g_cond_broadcast (&port->port_cond);
-          gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
-
+          GstOMXMessage *msg = g_slice_new (GstOMXMessage);
+
+          msg->type = GST_OMX_MESSAGE_PORT_ENABLE;
+          msg->content.port_enable.port = nData2;
+          msg->content.port_enable.enable = (cmd == OMX_CommandPortEnable);
+          GST_DEBUG_OBJECT (comp->parent, "Port %u %s",
+              msg->content.port_enable.port,
+              (msg->content.port_enable.enable ? "enabled" : "disabled"));
+
+          g_mutex_lock (&comp->messages_lock);
+          g_queue_push_tail (&comp->messages, msg);
+          g_cond_broadcast (&comp->messages_cond);
+          g_mutex_unlock (&comp->messages_lock);
           break;
         }
         default:
@@ -256,76 +424,54 @@ EventHandler (OMX_HANDLETYPE hComponent, OMX_PTR pAppData, OMX_EVENTTYPE eEvent,
     }
     case OMX_EventError:
     {
-      OMX_ERRORTYPE err = nData1;
+      GstOMXMessage *msg;
 
-      if (err == OMX_ErrorNone)
+      /* Yes, this really happens... */
+      if (nData1 == OMX_ErrorNone)
         break;
 
-      GST_ERROR_OBJECT (comp->parent, "Got error: %s (0x%08x)",
-          gst_omx_error_to_string (err), err);
+      msg = g_slice_new (GstOMXMessage);
 
-      /* Error events are always fatal */
-      gst_omx_component_set_last_error (comp, err);
+      msg->type = GST_OMX_MESSAGE_ERROR;
+      msg->content.error.error = nData1;
+      GST_ERROR_OBJECT (comp->parent, "Got error: %s (0x%08x)",
+          gst_omx_error_to_string (msg->content.error.error),
+          msg->content.error.error);
 
+      g_mutex_lock (&comp->messages_lock);
+      g_queue_push_tail (&comp->messages, msg);
+      g_cond_broadcast (&comp->messages_cond);
+      g_mutex_unlock (&comp->messages_lock);
       break;
     }
     case OMX_EventPortSettingsChanged:
     {
-      gint i, n;
-      guint32 port_index;
-      GList *outports = NULL, *l, *k;
+      GstOMXMessage *msg = g_slice_new (GstOMXMessage);
+      OMX_U32 index;
 
       if (!(comp->hacks &
               GST_OMX_HACK_EVENT_PORT_SETTINGS_CHANGED_NDATA_PARAMETER_SWAP)) {
-        port_index = nData1;
+        index = nData1;
       } else {
-        port_index = nData2;
+        index = nData2;
       }
 
-      if (port_index == 0
+
+      if (index == 0
           && (comp->hacks &
               GST_OMX_HACK_EVENT_PORT_SETTINGS_CHANGED_PORT_0_TO_1))
-        port_index = 1;
+        index = 1;
 
-      GST_DEBUG_OBJECT (comp->parent, "Settings changed (port index: %d)",
-          port_index);
-
-      /* Now update the ports' states */
-      n = (comp->ports ? comp->ports->len : 0);
-      for (i = 0; i < n; i++) {
-        GstOMXPort *port = g_ptr_array_index (comp->ports, i);
-
-        gst_omx_rec_mutex_recursive_lock (&port->port_lock);
-        if (port_index == OMX_ALL || port_index == port->index) {
-          port->settings_cookie++;
-          if (port->port_def.eDir == OMX_DirOutput)
-            outports = g_list_prepend (outports, port);
-          g_cond_broadcast (&port->port_cond);
-        }
-        gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
-      }
-
-      gst_omx_rec_mutex_recursive_lock (&comp->state_lock);
-      for (k = outports; k; k = k->next) {
-        gboolean found = FALSE;
-
-        for (l = comp->pending_reconfigure_outports; l; l = l->next) {
-          if (l->data == k->data) {
-            found = TRUE;
-            break;
-          }
-        }
 
-        if (!found)
-          comp->pending_reconfigure_outports =
-              g_list_prepend (comp->pending_reconfigure_outports, k->data);
-      }
-
-      if (comp->pending_reconfigure_outports)
-        g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1);
-      gst_omx_rec_mutex_recursive_unlock (&comp->state_lock);
+      msg->type = GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED;
+      msg->content.port_settings_changed.port = index;
+      GST_DEBUG_OBJECT (comp->parent, "Settings changed (port index: %d)",
+          msg->content.port_settings_changed.port);
 
-      g_list_free (outports);
+      g_mutex_lock (&comp->messages_lock);
+      g_queue_push_tail (&comp->messages, msg);
+      g_cond_broadcast (&comp->messages_cond);
+      g_mutex_unlock (&comp->messages_lock);
 
       break;
     }
@@ -342,48 +488,34 @@ static OMX_ERRORTYPE
 EmptyBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData,
     OMX_BUFFERHEADERTYPE * pBuffer)
 {
-  GstOMXBuffer *buf = pBuffer->pAppPrivate;
-  GstOMXPort *port;
+  GstOMXBuffer *buf;
   GstOMXComponent *comp;
+  GstOMXMessage *msg;
 
-  if (buf == NULL) {
+  buf = pBuffer->pAppPrivate;
+  if (!buf) {
     GST_ERROR ("Have unknown or deallocated buffer %p", pBuffer);
     return OMX_ErrorNone;
   }
 
-  port = buf->port;
-  comp = port->comp;
-
   g_assert (buf->omx_buf == pBuffer);
 
-  /* Input buffer is empty again and can
-   * be used to contain new input */
-  gst_omx_rec_mutex_recursive_lock (&port->port_lock);
-  GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)",
-      port->index, buf, buf->omx_buf->pBuffer);
-  buf->used = FALSE;
-
-  /* XXX: Some OMX implementations don't reset nOffset
-   * when the complete buffer is emptied but instead
-   * only reset nFilledLen. We reset nOffset to 0
-   * if nFilledLen == 0, which is safe to do because
-   * the offset *must* be 0 if the buffer is not
-   * filled at all.
-   *
-   * Seen in QCOM's OMX implementation.
-   */
-  if (buf->omx_buf->nFilledLen == 0)
-    buf->omx_buf->nOffset = 0;
+  comp = buf->port->comp;
 
-  /* Reset all flags, some implementations don't
-   * reset them themselves and the flags are not
-   * valid anymore after the buffer was consumed
-   */
-  buf->omx_buf->nFlags = 0;
+  msg = g_slice_new (GstOMXMessage);
+  msg->type = GST_OMX_MESSAGE_BUFFER_DONE;
+  msg->content.buffer_done.component = hComponent;
+  msg->content.buffer_done.app_data = pAppData;
+  msg->content.buffer_done.buffer = pBuffer;
+  msg->content.buffer_done.empty = OMX_TRUE;
+
+  GST_DEBUG_OBJECT (comp->parent, "Port %u emptied buffer %p (%p)",
+      buf->port->index, buf, buf->omx_buf->pBuffer);
 
-  g_queue_push_tail (port->pending_buffers, buf);
-  g_cond_broadcast (&port->port_cond);
-  gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
+  g_mutex_lock (&comp->messages_lock);
+  g_queue_push_tail (&comp->messages, msg);
+  g_cond_broadcast (&comp->messages_cond);
+  g_mutex_unlock (&comp->messages_lock);
 
   return OMX_ErrorNone;
 }
@@ -392,29 +524,34 @@ static OMX_ERRORTYPE
 FillBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData,
     OMX_BUFFERHEADERTYPE * pBuffer)
 {
-  GstOMXBuffer *buf = pBuffer->pAppPrivate;
-  GstOMXPort *port;
+  GstOMXBuffer *buf;
   GstOMXComponent *comp;
+  GstOMXMessage *msg;
 
-  if (buf == NULL) {
+  buf = pBuffer->pAppPrivate;
+  if (!buf) {
     GST_ERROR ("Have unknown or deallocated buffer %p", pBuffer);
     return OMX_ErrorNone;
   }
 
-  port = buf->port;
-  comp = port->comp;
-
   g_assert (buf->omx_buf == pBuffer);
 
-  /* Output buffer contains output now or
-   * the port was flushed */
-  gst_omx_rec_mutex_recursive_lock (&port->port_lock);
-  GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)", port->index,
-      buf, buf->omx_buf->pBuffer);
-  buf->used = FALSE;
-  g_queue_push_tail (port->pending_buffers, buf);
-  g_cond_broadcast (&port->port_cond);
-  gst_omx_rec_mutex_recursive_unlock (&port->port_lock);
+  comp = buf->port->comp;
+
+  msg = g_slice_new (GstOMXMessage);
+  msg->type = GST_OMX_MESSAGE_BUFFER_DONE;
+  msg->content.buffer_done.component = hComponent;
+  msg->content.buffer_done.app_data = pAppData;
+  msg->content.buffer_done.buffer = pBuffer;
+  msg->content.buffer_done.empty = OMX_FALSE;
+
+  GST_DEBUG_OBJECT (comp->parent, "Port %u filled buffer %p (%p)",
+      buf->port->index, buf, buf->omx_buf->pBuffer);
+
+  g_mutex_lock (&comp->messages_lock);
+  g_queue_push_tail (&comp->messages, msg);
+  g_cond_broadcast (&comp->messages_cond);
+  g_mutex_unlock (&comp->messages_lock);
 
   return OMX_ErrorNone;
 }
@@ -422,6 +559,7 @@ FillBufferDone (OMX_HANDLETYPE hComponent, OMX_PTR pAppData,
 static OMX_CALLBACKTYPE callbacks =
     { EventHandler, EmptyBufferDone, FillBufferDone };
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 GstOMXComponent *
 gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata)
 {
@@ -457,8 +595,11 @@ gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata)
   comp->n_in_ports = 0;
   comp->n_out_ports = 0;
 
-  gst_omx_rec_mutex_init (&comp->state_lock);
-  g_cond_init (&comp->state_cond);
+  g_mutex_init (&comp->lock);
+  g_mutex_init (&comp->messages_lock);
+  g_cond_init (&comp->messages_cond);
+
+  g_queue_init (&comp->messages);
   comp->pending_state = OMX_StateInvalid;
   comp->last_error = OMX_ErrorNone;
 
@@ -486,9 +627,14 @@ gst_omx_component_new (GstObject * parent, const GstOMXClassData * cdata)
 
   OMX_GetState (comp->handle, &comp->state);
 
+  g_mutex_lock (&comp->lock);
+  gst_omx_component_handle_messages (comp);
+  g_mutex_unlock (&comp->lock);
+
   return comp;
 }
 
+/* NOTE: Uses comp->messages_lock */
 void
 gst_omx_component_free (GstOMXComponent * comp)
 {
@@ -504,32 +650,30 @@ gst_omx_component_free (GstOMXComponent * comp)
       GstOMXPort *port = g_ptr_array_index (comp->ports, i);
 
       gst_omx_port_deallocate_buffers (port);
-
-      gst_omx_rec_mutex_clear (&port->port_lock);
-      g_cond_clear (&port->port_cond);
-      g_queue_free (port->pending_buffers);
+      g_assert (port->buffers == NULL);
+      g_assert (g_queue_get_length (&port->pending_buffers) == 0);
 
       g_slice_free (GstOMXPort, port);
     }
-#if GLIB_CHECK_VERSION(2,22,0)
     g_ptr_array_unref (comp->ports);
-#else
-    g_ptr_array_free (comp->ports, TRUE);
-#endif
     comp->ports = NULL;
   }
 
   comp->core->free_handle (comp->handle);
   gst_omx_core_release (comp->core);
 
-  g_cond_clear (&comp->state_cond);
-  gst_omx_rec_mutex_clear (&comp->state_lock);
+  gst_omx_component_flush_messages (comp);
+
+  g_cond_clear (&comp->messages_cond);
+  g_mutex_clear (&comp->messages_lock);
+  g_mutex_clear (&comp->lock);
 
   gst_object_unref (comp->parent);
 
   g_slice_free (GstOMXComponent, comp);
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state)
 {
@@ -538,10 +682,14 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state)
 
   g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined);
 
-  gst_omx_rec_mutex_lock_for_recursion (&comp->state_lock);
+  g_mutex_lock (&comp->lock);
+
+  gst_omx_component_handle_messages (comp);
+
   old_state = comp->state;
   GST_DEBUG_OBJECT (comp->parent, "Setting state from %d to %d", old_state,
       state);
+
   if ((err = comp->last_error) != OMX_ErrorNone && state > old_state) {
     GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
         gst_omx_error_to_string (err), err);
@@ -561,16 +709,18 @@ gst_omx_component_set_state (GstOMXComponent * comp, OMX_STATETYPE state)
     g_list_free (comp->pending_reconfigure_outports);
     comp->pending_reconfigure_outports = NULL;
     /* Notify all inports that are still waiting */
-    g_cond_broadcast (&comp->state_cond);
+    g_mutex_lock (&comp->messages_lock);
+    g_cond_broadcast (&comp->messages_cond);
+    g_mutex_unlock (&comp->messages_lock);
   }
 
-  gst_omx_rec_mutex_begin_recursion (&comp->state_lock);
   err = OMX_SendCommand (comp->handle, OMX_CommandStateSet, state, NULL);
-  gst_omx_rec_mutex_end_recursion (&comp->state_lock);
   /* No need to check if anything has changed here */
 
 done:
-  gst_omx_rec_mutex_unlock_for_recursion (&comp->state_lock);
+
+  gst_omx_component_handle_messages (comp);
+  g_mutex_unlock (&comp->lock);
 
   if (err != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent,
@@ -581,6 +731,7 @@ done:
   return err;
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_STATETYPE
 gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout)
 {
@@ -592,7 +743,10 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout)
 
   GST_DEBUG_OBJECT (comp->parent, "Getting state");
 
-  gst_omx_rec_mutex_lock (&comp->state_lock);
+  g_mutex_lock (&comp->lock);
+
+  gst_omx_component_handle_messages (comp);
+
   ret = comp->state;
   if (comp->pending_state == OMX_StateInvalid)
     goto done;
@@ -611,22 +765,32 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout)
       goto done;
 
     wait_until = g_get_monotonic_time () + add;
-    GST_DEBUG_OBJECT (comp->parent, "Waiting for %ld us", add);
+    GST_DEBUG_OBJECT (comp->parent, "Waiting for %" G_GINT64_FORMAT "us", add);
   } else {
     GST_DEBUG_OBJECT (comp->parent, "Waiting for signal");
   }
 
-  do {
+  gst_omx_component_handle_messages (comp);
+  while (signalled && comp->last_error == OMX_ErrorNone
+      && comp->pending_state != OMX_StateInvalid) {
+    g_mutex_lock (&comp->messages_lock);
+    g_mutex_unlock (&comp->lock);
+    if (!g_queue_is_empty (&comp->messages)) {
+      signalled = TRUE;
+    }
     if (wait_until == -1) {
-      g_cond_wait (&comp->state_cond, &comp->state_lock.lock);
+      g_cond_wait (&comp->messages_cond, &comp->messages_lock);
       signalled = TRUE;
     } else {
       signalled =
-          g_cond_wait_until (&comp->state_cond, &comp->state_lock.lock,
+          g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
           wait_until);
     }
-  } while (signalled && comp->last_error == OMX_ErrorNone
-      && comp->pending_state != OMX_StateInvalid);
+    g_mutex_unlock (&comp->messages_lock);
+    g_mutex_lock (&comp->lock);
+    if (signalled)
+      gst_omx_component_handle_messages (comp);
+  };
 
   if (signalled) {
     if (comp->last_error != OMX_ErrorNone) {
@@ -647,7 +811,7 @@ gst_omx_component_get_state (GstOMXComponent * comp, GstClockTime timeout)
   }
 
 done:
-  gst_omx_rec_mutex_unlock (&comp->state_lock);
+  g_mutex_unlock (&comp->lock);
 
   /* If we waited and timed out this component is unusable now */
   if (!signalled)
@@ -694,9 +858,7 @@ gst_omx_component_add_port (GstOMXComponent * comp, guint32 index)
 
   port->port_def = port_def;
 
-  gst_omx_rec_mutex_init (&port->port_lock);
-  g_cond_init (&port->port_cond);
-  port->pending_buffers = g_queue_new ();
+  g_queue_init (&port->pending_buffers);
   port->flushing = TRUE;
   port->flushed = FALSE;
   port->settings_changed = FALSE;
@@ -717,12 +879,6 @@ gst_omx_component_get_port (GstOMXComponent * comp, guint32 index)
 {
   gint i, n;
 
-  /* No need for locking here because the
-   * ports are all added directly after
-   * creating the component and are removed
-   * when the component is destroyed.
-   */
-
   n = comp->ports->len;
   for (i = 0; i < n; i++) {
     GstOMXPort *tmp = g_ptr_array_index (comp->ports, i);
@@ -754,43 +910,31 @@ gst_omx_component_trigger_settings_changed (GstOMXComponent * comp,
   }
 }
 
-/* NOTE: This takes comp->state_lock *and* *all* port->port_lock! */
+/* NOTE: Uses comp->lock and comp->messages_lock */
 void
 gst_omx_component_set_last_error (GstOMXComponent * comp, OMX_ERRORTYPE err)
 {
-  gint i, n;
-
   g_return_if_fail (comp != NULL);
 
   if (err == OMX_ErrorNone)
     return;
 
+  g_mutex_lock (&comp->lock);
   GST_ERROR_OBJECT (comp->parent, "Setting last error: %s (0x%08x)",
       gst_omx_error_to_string (err), err);
-  gst_omx_rec_mutex_recursive_lock (&comp->state_lock);
   /* We only set the first error ever from which
    * we can't recover anymore.
    */
   if (comp->last_error == OMX_ErrorNone)
     comp->last_error = err;
-  g_cond_broadcast (&comp->state_cond);
-  gst_omx_rec_mutex_recursive_unlock (&comp->state_lock);
-
-  /* Now notify all ports, no locking needed
-   * here because the ports are allocated in the
-   * very beginning and never change again until
-   * component destruction.
-   */
-  n = (comp->ports ? comp->ports->len : 0);
-  for (i = 0; i < n; i++) {
-    GstOMXPort *tmp = g_ptr_array_index (comp->ports, i);
+  g_mutex_unlock (&comp->lock);
 
-    gst_omx_rec_mutex_recursive_lock (&tmp->port_lock);
-    g_cond_broadcast (&tmp->port_cond);
-    gst_omx_rec_mutex_recursive_unlock (&tmp->port_lock);
-  }
+  g_mutex_lock (&comp->messages_lock);
+  g_cond_broadcast (&comp->messages_cond);
+  g_mutex_unlock (&comp->messages_lock);
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_component_get_last_error (GstOMXComponent * comp)
 {
@@ -798,9 +942,10 @@ gst_omx_component_get_last_error (GstOMXComponent * comp)
 
   g_return_val_if_fail (comp != NULL, OMX_ErrorUndefined);
 
-  gst_omx_rec_mutex_lock (&comp->state_lock);
+  g_mutex_lock (&comp->lock);
+  gst_omx_component_handle_messages (comp);
   err = comp->last_error;
-  gst_omx_rec_mutex_unlock (&comp->state_lock);
+  g_mutex_unlock (&comp->lock);
 
   GST_DEBUG_OBJECT (comp->parent, "Returning last error: %s (0x%08x)",
       gst_omx_error_to_string (err), err);
@@ -816,6 +961,7 @@ gst_omx_component_get_last_error_string (GstOMXComponent * comp)
   return gst_omx_error_to_string (gst_omx_component_get_last_error (comp));
 }
 
+/* comp->lock must be unlocked while calling this */
 OMX_ERRORTYPE
 gst_omx_component_get_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index,
     gpointer param)
@@ -833,6 +979,7 @@ gst_omx_component_get_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index,
   return err;
 }
 
+/* comp->lock must be unlocked while calling this */
 OMX_ERRORTYPE
 gst_omx_component_set_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index,
     gpointer param)
@@ -850,6 +997,7 @@ gst_omx_component_set_parameter (GstOMXComponent * comp, OMX_INDEXTYPE index,
   return err;
 }
 
+/* comp->lock must be unlocked while calling this */
 OMX_ERRORTYPE
 gst_omx_component_get_config (GstOMXComponent * comp, OMX_INDEXTYPE index,
     gpointer config)
@@ -868,6 +1016,7 @@ gst_omx_component_get_config (GstOMXComponent * comp, OMX_INDEXTYPE index,
   return err;
 }
 
+/* comp->lock must be unlocked while calling this */
 OMX_ERRORTYPE
 gst_omx_component_set_config (GstOMXComponent * comp, OMX_INDEXTYPE index,
     gpointer config)
@@ -914,7 +1063,6 @@ gst_omx_port_update_port_definition (GstOMXPort * port,
 
   comp = port->comp;
 
-  gst_omx_rec_mutex_lock (&port->port_lock);
   if (port_def)
     err =
         gst_omx_component_set_parameter (comp, OMX_IndexParamPortDefinition,
@@ -925,11 +1073,10 @@ gst_omx_port_update_port_definition (GstOMXPort * port,
   GST_DEBUG_OBJECT (comp->parent, "Updated port %u definition: %s (0x%08x)",
       port->index, gst_omx_error_to_string (err), err);
 
-  gst_omx_rec_mutex_unlock (&port->port_lock);
-
   return (err == OMX_ErrorNone);
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 GstOMXAcquireBufferReturn
 gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf)
 {
@@ -945,14 +1092,14 @@ gst_omx_port_acquire_buffer (GstOMXPort * port, GstOMXBuffer ** buf)
 
   comp = port->comp;
 
+  g_mutex_lock (&comp->lock);
   GST_DEBUG_OBJECT (comp->parent, "Acquiring buffer from port %u", port->index);
 
-  gst_omx_rec_mutex_lock (&port->port_lock);
-
 retry:
+  gst_omx_component_handle_messages (comp);
 
   /* Check if the component is in an error state */
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+  if ((err = comp->last_error) != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s",
         gst_omx_error_to_string (err));
     ret = GST_OMX_ACQUIRE_BUFFER_ERROR;
@@ -972,16 +1119,19 @@ retry:
    */
   if (port->port_def.eDir == OMX_DirInput) {
     if (g_atomic_int_get (&comp->have_pending_reconfigure_outports)) {
-      gst_omx_rec_mutex_unlock (&port->port_lock);
-      gst_omx_rec_mutex_lock (&comp->state_lock);
+      gst_omx_component_handle_messages (comp);
       while (g_atomic_int_get (&comp->have_pending_reconfigure_outports) &&
           (err = comp->last_error) == OMX_ErrorNone && !port->flushing) {
         GST_DEBUG_OBJECT (comp->parent,
             "Waiting for output ports to reconfigure");
-        g_cond_wait (&comp->state_cond, &comp->state_lock.lock);
+        g_mutex_lock (&comp->messages_lock);
+        g_mutex_unlock (&comp->lock);
+        if (g_queue_is_empty (&comp->messages))
+          g_cond_wait (&comp->messages_cond, &comp->messages_lock);
+        g_mutex_unlock (&comp->messages_lock);
+        g_mutex_lock (&comp->lock);
+        gst_omx_component_handle_messages (comp);
       }
-      gst_omx_rec_mutex_unlock (&comp->state_lock);
-      gst_omx_rec_mutex_lock (&port->port_lock);
       goto retry;
     }
 
@@ -1001,11 +1151,11 @@ retry:
    * we have to drop them... */
   if (port->port_def.eDir == OMX_DirOutput &&
       port->settings_cookie != port->configured_settings_cookie) {
-    if (!g_queue_is_empty (port->pending_buffers)) {
+    if (!g_queue_is_empty (&port->pending_buffers)) {
       GST_DEBUG_OBJECT (comp->parent,
           "Output port %u needs reconfiguration but has buffers pending",
           port->index);
-      _buf = g_queue_pop_head (port->pending_buffers);
+      _buf = g_queue_pop_head (&port->pending_buffers);
 
       ret = GST_OMX_ACQUIRE_BUFFER_OK;
       goto done;
@@ -1034,21 +1184,31 @@ retry:
    * arrives, an error happens, the port is flushing
    * or the port needs to be reconfigured.
    */
-  if (g_queue_is_empty (port->pending_buffers)) {
+  gst_omx_component_handle_messages (comp);
+  if (g_queue_is_empty (&port->pending_buffers)) {
     GST_DEBUG_OBJECT (comp->parent, "Queue of port %u is empty", port->index);
-    g_cond_wait (&port->port_cond, &port->port_lock.lock);
+    g_mutex_lock (&comp->messages_lock);
+    g_mutex_unlock (&comp->lock);
+    if (g_queue_is_empty (&comp->messages))
+      g_cond_wait (&comp->messages_cond, &comp->messages_lock);
+    g_mutex_unlock (&comp->messages_lock);
+    g_mutex_lock (&comp->lock);
+    gst_omx_component_handle_messages (comp);
+
+    /* And now check everything again and maybe get a buffer */
+    goto retry;
   } else {
     GST_DEBUG_OBJECT (comp->parent, "Port %u has pending buffers", port->index);
-    _buf = g_queue_pop_head (port->pending_buffers);
+    _buf = g_queue_pop_head (&port->pending_buffers);
     ret = GST_OMX_ACQUIRE_BUFFER_OK;
     goto done;
   }
 
-  /* And now check everything again and maybe get a buffer */
+  g_assert_not_reached ();
   goto retry;
 
 done:
-  gst_omx_rec_mutex_unlock (&port->port_lock);
+  g_mutex_unlock (&comp->lock);
 
   if (_buf) {
     g_assert (_buf == _buf->omx_buf->pAppPrivate);
@@ -1061,6 +1221,7 @@ done:
   return ret;
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf)
 {
@@ -1073,10 +1234,12 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf)
 
   comp = port->comp;
 
+  g_mutex_lock (&comp->lock);
+
   GST_DEBUG_OBJECT (comp->parent, "Releasing buffer %p (%p) to port %u",
       buf, buf->omx_buf->pBuffer, port->index);
 
-  gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+  gst_omx_component_handle_messages (comp);
 
   if (port->port_def.eDir == OMX_DirInput) {
     /* Reset all flags, some implementations don't
@@ -1086,19 +1249,23 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf)
     buf->omx_buf->nFlags = 0;
   }
 
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+  if ((err = comp->last_error) != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s (0x%08x)",
         gst_omx_error_to_string (err), err);
-    g_queue_push_tail (port->pending_buffers, buf);
-    g_cond_broadcast (&port->port_cond);
+    g_queue_push_tail (&port->pending_buffers, buf);
+    g_mutex_lock (&comp->messages_lock);
+    g_cond_broadcast (&comp->messages_cond);
+    g_mutex_lock (&comp->messages_lock);
     goto done;
   }
 
   if (port->flushing) {
     GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing, not releasing buffer",
         port->index);
-    g_queue_push_tail (port->pending_buffers, buf);
-    g_cond_broadcast (&port->port_cond);
+    g_queue_push_tail (&port->pending_buffers, buf);
+    g_mutex_lock (&comp->messages_lock);
+    g_cond_broadcast (&comp->messages_cond);
+    g_mutex_unlock (&comp->messages_lock);
     goto done;
   }
 
@@ -1108,23 +1275,25 @@ gst_omx_port_release_buffer (GstOMXPort * port, GstOMXBuffer * buf)
 
   buf->used = TRUE;
 
-  gst_omx_rec_mutex_begin_recursion (&port->port_lock);
   if (port->port_def.eDir == OMX_DirInput) {
     err = OMX_EmptyThisBuffer (comp->handle, buf->omx_buf);
   } else {
     err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
   }
-  gst_omx_rec_mutex_end_recursion (&port->port_lock);
-
   GST_DEBUG_OBJECT (comp->parent, "Released buffer %p to port %u: %s (0x%08x)",
       buf, port->index, gst_omx_error_to_string (err), err);
 
 done:
-  gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+  gst_omx_component_handle_messages (comp);
+  g_mutex_unlock (&comp->lock);
+
+  if (err != OMX_ErrorNone)
+    gst_omx_component_set_last_error (comp, err);
 
   return err;
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
 {
@@ -1134,33 +1303,34 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
   g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
 
   comp = port->comp;
+
+  g_mutex_lock (&comp->lock);
+
   GST_DEBUG_OBJECT (comp->parent, "Setting port %d to %sflushing",
       port->index, (flush ? "" : "not "));
 
-  gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+  gst_omx_component_handle_messages (comp);
+
   if (! !flush == ! !port->flushing) {
     GST_DEBUG_OBJECT (comp->parent, "Port %u was %sflushing already",
         port->index, (flush ? "" : "not "));
     goto done;
   }
 
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+  if ((err = comp->last_error) != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent, "Component is in error state: %s (0x%08x)",
         gst_omx_error_to_string (err), err);
     goto done;
   }
 
-  gst_omx_rec_mutex_lock (&comp->state_lock);
   if (comp->state != OMX_StateIdle && comp->state != OMX_StateExecuting) {
 
     GST_DEBUG_OBJECT (comp->parent, "Component is in wrong state: %d",
         comp->state);
     err = OMX_ErrorUndefined;
 
-    gst_omx_rec_mutex_unlock (&comp->state_lock);
     goto done;
   }
-  gst_omx_rec_mutex_unlock (&comp->state_lock);
 
   port->flushing = flush;
   if (flush) {
@@ -1168,25 +1338,14 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
     gboolean signalled;
     OMX_ERRORTYPE last_error;
 
-    g_cond_broadcast (&port->port_cond);
-
-    /* We also need to signal the state cond because
-     * an input port might wait on this for the output
-     * ports to reconfigure. This will not confuse
-     * other waiters on the state cond because they will
-     * additionally check if the condition they're waiting
-     * for is true after waking up.
-     */
-    gst_omx_rec_mutex_lock (&comp->state_lock);
-    g_cond_broadcast (&comp->state_cond);
-    gst_omx_rec_mutex_unlock (&comp->state_lock);
+    g_mutex_lock (&comp->messages_lock);
+    g_cond_broadcast (&comp->messages_cond);
+    g_mutex_unlock (&comp->messages_lock);
 
     /* Now flush the port */
     port->flushed = FALSE;
 
-    gst_omx_rec_mutex_begin_recursion (&port->port_lock);
     err = OMX_SendCommand (comp->handle, OMX_CommandFlush, port->index, NULL);
-    gst_omx_rec_mutex_end_recursion (&port->port_lock);
 
     if (err != OMX_ErrorNone) {
       GST_ERROR_OBJECT (comp->parent,
@@ -1195,7 +1354,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
       goto done;
     }
 
-    if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+    if ((err = comp->last_error) != OMX_ErrorNone) {
       GST_ERROR_OBJECT (comp->parent,
           "Component is in error state: %s (0x%08x)",
           gst_omx_error_to_string (err), err);
@@ -1215,13 +1374,24 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
      * the flush command completed */
     signalled = TRUE;
     last_error = OMX_ErrorNone;
+    gst_omx_component_handle_messages (comp);
     while (signalled && last_error == OMX_ErrorNone && !port->flushed
-        && port->buffers->len > g_queue_get_length (port->pending_buffers)) {
-      signalled =
-          g_cond_wait_until (&comp->state_cond, &comp->state_lock.lock,
-          wait_until);
-
-      last_error = gst_omx_component_get_last_error (comp);
+        && port->buffers->len > g_queue_get_length (&port->pending_buffers)) {
+      g_mutex_lock (&comp->messages_lock);
+      g_mutex_unlock (&comp->lock);
+      if (!g_queue_is_empty (&comp->messages))
+        signalled = TRUE;
+      else
+        signalled =
+            g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
+            wait_until);
+      g_mutex_unlock (&comp->messages_lock);
+      g_mutex_lock (&comp->lock);
+
+      if (signalled)
+        gst_omx_component_handle_messages (comp);
+
+      last_error = comp->last_error;
     }
     port->flushed = FALSE;
 
@@ -1243,7 +1413,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
       GstOMXBuffer *buf;
 
       /* Enqueue all buffers for the component to fill */
-      while ((buf = g_queue_pop_head (port->pending_buffers))) {
+      while ((buf = g_queue_pop_head (&port->pending_buffers))) {
         if (!buf)
           continue;
 
@@ -1255,9 +1425,7 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
          */
         buf->omx_buf->nFlags = 0;
 
-        gst_omx_rec_mutex_begin_recursion (&port->port_lock);
         err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
-        gst_omx_rec_mutex_end_recursion (&port->port_lock);
 
         if (err != OMX_ErrorNone) {
           GST_ERROR_OBJECT (comp->parent,
@@ -1275,23 +1443,21 @@ gst_omx_port_set_flushing (GstOMXPort * port, gboolean flush)
 done:
   GST_DEBUG_OBJECT (comp->parent, "Set port %u to %sflushing: %s (0x%08x)",
       port->index, (flush ? "" : "not "), gst_omx_error_to_string (err), err);
-  gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+  gst_omx_component_handle_messages (comp);
+  g_mutex_unlock (&comp->lock);
 
   return err;
 
 error:
   {
-    /* Need to unlock the port lock here because
-     * set_last_error() needs all port locks.
-     * This is safe here because we're just going
-     * to error out anyway */
-    gst_omx_rec_mutex_unlock (&port->port_lock);
+    g_mutex_unlock (&comp->lock);
     gst_omx_component_set_last_error (comp, err);
-    gst_omx_rec_mutex_lock (&port->port_lock);
+    g_mutex_lock (&comp->lock);
     goto done;
   }
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 gboolean
 gst_omx_port_is_flushing (GstOMXPort * port)
 {
@@ -1302,9 +1468,10 @@ gst_omx_port_is_flushing (GstOMXPort * port)
 
   comp = port->comp;
 
-  gst_omx_rec_mutex_lock (&port->port_lock);
+  g_mutex_lock (&comp->lock);
+  gst_omx_component_handle_messages (port->comp);
   flushing = port->flushing;
-  gst_omx_rec_mutex_unlock (&port->port_lock);
+  g_mutex_unlock (&comp->lock);
 
   GST_DEBUG_OBJECT (comp->parent, "Port %u is flushing: %d", port->index,
       flushing);
@@ -1312,7 +1479,7 @@ gst_omx_port_is_flushing (GstOMXPort * port)
   return flushing;
 }
 
-/* Must be called while holding port->lock */
+/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */
 static OMX_ERRORTYPE
 gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port)
 {
@@ -1324,7 +1491,8 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port)
 
   comp = port->comp;
 
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+  gst_omx_component_handle_messages (port->comp);
+  if ((err = comp->last_error) != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
         gst_omx_error_to_string (err), err);
     goto done;
@@ -1373,11 +1541,9 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port)
     buf->settings_cookie = port->settings_cookie;
     g_ptr_array_add (port->buffers, buf);
 
-    gst_omx_rec_mutex_begin_recursion (&port->port_lock);
     err =
         OMX_AllocateBuffer (comp->handle, &buf->omx_buf, port->index, buf,
         port->port_def.nBufferSize);
-    gst_omx_rec_mutex_end_recursion (&port->port_lock);
 
     if (err != OMX_ErrorNone) {
       GST_ERROR_OBJECT (comp->parent,
@@ -1392,9 +1558,11 @@ gst_omx_port_allocate_buffers_unlocked (GstOMXPort * port)
     g_assert (buf->omx_buf->pAppPrivate == buf);
 
     /* In the beginning all buffers are not owned by the component */
-    g_queue_push_tail (port->pending_buffers, buf);
+    g_queue_push_tail (&port->pending_buffers, buf);
   }
 
+  gst_omx_component_handle_messages (comp);
+
 done:
   GST_DEBUG_OBJECT (comp->parent, "Allocated buffers for port %u: %s (0x%08x)",
       port->index, gst_omx_error_to_string (err), err);
@@ -1403,17 +1571,14 @@ done:
 
 error:
   {
-    /* Need to unlock the port lock here because
-     * set_last_error() needs all port locks.
-     * This is safe here because we're just going
-     * to error out anyway */
-    gst_omx_rec_mutex_unlock (&port->port_lock);
+    g_mutex_unlock (&comp->lock);
     gst_omx_component_set_last_error (comp, err);
-    gst_omx_rec_mutex_lock (&port->port_lock);
+    g_mutex_lock (&comp->lock);
     goto done;
   }
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_port_allocate_buffers (GstOMXPort * port)
 {
@@ -1421,14 +1586,14 @@ gst_omx_port_allocate_buffers (GstOMXPort * port)
 
   g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
 
-  gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+  g_mutex_lock (&port->comp->lock);
   err = gst_omx_port_allocate_buffers_unlocked (port);
-  gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+  g_mutex_unlock (&port->comp->lock);
 
   return err;
 }
 
-/* Must be called while holding port->lock */
+/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */
 static OMX_ERRORTYPE
 gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port)
 {
@@ -1441,13 +1606,15 @@ gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port)
   GST_DEBUG_OBJECT (comp->parent, "Deallocating buffers of port %u",
       port->index);
 
+  gst_omx_component_handle_messages (port->comp);
+
   if (!port->buffers) {
     GST_DEBUG_OBJECT (comp->parent, "No buffers allocated for port %u",
         port->index);
     goto done;
   }
 
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+  if ((err = comp->last_error) != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
         gst_omx_error_to_string (err), err);
     /* We still try to deallocate all buffers */
@@ -1478,9 +1645,7 @@ gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port)
       GST_DEBUG_OBJECT (comp->parent, "Deallocating buffer %p (%p)", buf,
           buf->omx_buf->pBuffer);
 
-      gst_omx_rec_mutex_begin_recursion (&port->port_lock);
       tmp = OMX_FreeBuffer (comp->handle, port->index, buf->omx_buf);
-      gst_omx_rec_mutex_end_recursion (&port->port_lock);
 
       if (tmp != OMX_ErrorNone) {
         GST_ERROR_OBJECT (comp->parent,
@@ -1492,15 +1657,12 @@ gst_omx_port_deallocate_buffers_unlocked (GstOMXPort * port)
     }
     g_slice_free (GstOMXBuffer, buf);
   }
-
-  g_queue_clear (port->pending_buffers);
-#if GLIB_CHECK_VERSION(2,22,0)
+  g_queue_clear (&port->pending_buffers);
   g_ptr_array_unref (port->buffers);
-#else
-  g_ptr_array_free (port->buffers, TRUE);
-#endif
   port->buffers = NULL;
 
+  gst_omx_component_handle_messages (comp);
+
 done:
   GST_DEBUG_OBJECT (comp->parent, "Deallocated buffers of port %u: %s (0x%08x)",
       port->index, gst_omx_error_to_string (err), err);
@@ -1508,6 +1670,7 @@ done:
   return err;
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_port_deallocate_buffers (GstOMXPort * port)
 {
@@ -1515,14 +1678,14 @@ gst_omx_port_deallocate_buffers (GstOMXPort * port)
 
   g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
 
-  gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+  g_mutex_lock (&port->comp->lock);
   err = gst_omx_port_deallocate_buffers_unlocked (port);
-  gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+  g_mutex_unlock (&port->comp->lock);
 
   return err;
 }
 
-/* Must be called while holding port->lock */
+/* NOTE: Must be called while holding comp->lock, uses comp->messages_lock */
 static OMX_ERRORTYPE
 gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
 {
@@ -1534,7 +1697,9 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
 
   comp = port->comp;
 
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+  gst_omx_component_handle_messages (comp);
+
+  if ((err = comp->last_error) != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
         gst_omx_error_to_string (err), err);
     goto done;
@@ -1558,7 +1723,6 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
     port->flushing = TRUE;
   }
 
-  gst_omx_rec_mutex_begin_recursion (&port->port_lock);
   if (enabled)
     err =
         OMX_SendCommand (comp->handle, OMX_CommandPortEnable, port->index,
@@ -1567,7 +1731,6 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
     err =
         OMX_SendCommand (comp->handle, OMX_CommandPortDisable,
         port->index, NULL);
-  gst_omx_rec_mutex_end_recursion (&port->port_lock);
 
   if (err != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent,
@@ -1576,7 +1739,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
     goto error;
   }
 
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone) {
+  if ((err = comp->last_error) != OMX_ErrorNone) {
     GST_ERROR_OBJECT (comp->parent, "Component in error state: %s (0x%08x)",
         gst_omx_error_to_string (err), err);
     goto done;
@@ -1593,11 +1756,23 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
   /* First wait until all buffers are released by the port */
   signalled = TRUE;
   last_error = OMX_ErrorNone;
+  gst_omx_component_handle_messages (comp);
   while (signalled && last_error == OMX_ErrorNone && (port->buffers
-          && port->buffers->len > g_queue_get_length (port->pending_buffers))) {
-    signalled =
-        g_cond_wait_until (&port->port_cond, &port->port_lock.lock, wait_until);
-    last_error = gst_omx_component_get_last_error (comp);
+          && port->buffers->len >
+          g_queue_get_length (&port->pending_buffers))) {
+    g_mutex_lock (&comp->messages_lock);
+    g_mutex_unlock (&comp->lock);
+    if (!g_queue_is_empty (&comp->messages))
+      signalled = TRUE;
+    else
+      signalled =
+          g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
+          wait_until);
+    g_mutex_unlock (&comp->messages_lock);
+    g_mutex_lock (&comp->lock);
+    if (signalled)
+      gst_omx_component_handle_messages (comp);
+    last_error = comp->last_error;
   }
 
   if (last_error != OMX_ErrorNone) {
@@ -1633,14 +1808,26 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
   last_error = OMX_ErrorNone;
   gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition,
       &port->port_def);
+  gst_omx_component_handle_messages (comp);
   while (signalled && last_error == OMX_ErrorNone
       && (! !port->port_def.bEnabled != ! !enabled || !port->enabled_changed)) {
-    signalled =
-        g_cond_wait_until (&port->port_cond, &port->port_lock.lock, wait_until);
-    last_error = gst_omx_component_get_last_error (comp);
+    g_mutex_lock (&comp->messages_lock);
+    g_mutex_unlock (&comp->lock);
+    if (!g_queue_is_empty (&comp->messages))
+      signalled = TRUE;
+    else
+      signalled =
+          g_cond_wait_until (&comp->messages_cond, &comp->messages_lock,
+          wait_until);
+    g_mutex_unlock (&comp->messages_lock);
+    g_mutex_lock (&comp->lock);
+    if (signalled)
+      gst_omx_component_handle_messages (comp);
+    last_error = comp->last_error;
     gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition,
         &port->port_def);
   }
+  port->enabled_changed = FALSE;
 
   if (!signalled) {
     GST_ERROR_OBJECT (comp->parent,
@@ -1665,7 +1852,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
       GstOMXBuffer *buf;
 
       /* Enqueue all buffers for the component to fill */
-      while ((buf = g_queue_pop_head (port->pending_buffers))) {
+      while ((buf = g_queue_pop_head (&port->pending_buffers))) {
         if (!buf)
           continue;
 
@@ -1677,9 +1864,7 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
          */
         buf->omx_buf->nFlags = 0;
 
-        gst_omx_rec_mutex_begin_recursion (&port->port_lock);
         err = OMX_FillThisBuffer (comp->handle, buf->omx_buf);
-        gst_omx_rec_mutex_end_recursion (&port->port_lock);
 
         if (err != OMX_ErrorNone) {
           GST_ERROR_OBJECT (comp->parent,
@@ -1694,6 +1879,8 @@ gst_omx_port_set_enabled_unlocked (GstOMXPort * port, gboolean enabled)
     }
   }
 
+  gst_omx_component_handle_messages (comp);
+
 done:
   GST_DEBUG_OBJECT (comp->parent, "Port %u is %s%s: %s (0x%08x)", port->index,
       (err == OMX_ErrorNone ? "" : "not "),
@@ -1703,17 +1890,14 @@ done:
 
 error:
   {
-    /* Need to unlock the port lock here because
-     * set_last_error() needs all port locks.
-     * This is safe here because we're just going
-     * to error out anyway */
-    gst_omx_rec_mutex_unlock (&port->port_lock);
+    g_mutex_unlock (&comp->lock);
     gst_omx_component_set_last_error (comp, err);
-    gst_omx_rec_mutex_lock (&port->port_lock);
+    g_mutex_lock (&comp->lock);
     goto done;
   }
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled)
 {
@@ -1721,9 +1905,9 @@ gst_omx_port_set_enabled (GstOMXPort * port, gboolean enabled)
 
   g_return_val_if_fail (port != NULL, OMX_ErrorUndefined);
 
-  gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+  g_mutex_lock (&port->comp->lock);
   err = gst_omx_port_set_enabled_unlocked (port, enabled);
-  gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+  g_mutex_unlock (&port->comp->lock);
 
   return err;
 }
@@ -1738,11 +1922,9 @@ gst_omx_port_is_enabled (GstOMXPort * port)
 
   comp = port->comp;
 
-  gst_omx_rec_mutex_lock (&port->port_lock);
   gst_omx_component_get_parameter (comp, OMX_IndexParamPortDefinition,
       &port->port_def);
-  enabled = port->port_def.bEnabled;
-  gst_omx_rec_mutex_unlock (&port->port_lock);
+  enabled = ! !port->port_def.bEnabled;
 
   GST_DEBUG_OBJECT (comp->parent, "Port %u is enabled: %d", port->index,
       enabled);
@@ -1750,6 +1932,7 @@ gst_omx_port_is_enabled (GstOMXPort * port)
   return enabled;
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_port_reconfigure (GstOMXPort * port)
 {
@@ -1760,14 +1943,15 @@ gst_omx_port_reconfigure (GstOMXPort * port)
 
   comp = port->comp;
 
+  g_mutex_lock (&comp->lock);
   GST_DEBUG_OBJECT (comp->parent, "Reconfiguring port %u", port->index);
 
-  gst_omx_rec_mutex_lock_for_recursion (&port->port_lock);
+  gst_omx_component_handle_messages (comp);
 
   if (!port->settings_changed)
     goto done;
 
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone)
+  if ((err = comp->last_error) != OMX_ErrorNone)
     goto done;
 
   /* Disable and enable the port. This already takes
@@ -1790,7 +1974,6 @@ gst_omx_port_reconfigure (GstOMXPort * port)
   if (port->port_def.eDir == OMX_DirOutput) {
     GList *l;
 
-    gst_omx_rec_mutex_lock (&comp->state_lock);
     for (l = comp->pending_reconfigure_outports; l; l = l->next) {
       if (l->data == (gpointer) port) {
         comp->pending_reconfigure_outports =
@@ -1800,20 +1983,22 @@ gst_omx_port_reconfigure (GstOMXPort * port)
     }
     if (!comp->pending_reconfigure_outports) {
       g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0);
-      g_cond_broadcast (&comp->state_cond);
+      g_mutex_lock (&comp->messages_lock);
+      g_cond_broadcast (&comp->messages_cond);
+      g_mutex_unlock (&comp->messages_lock);
     }
-    gst_omx_rec_mutex_unlock (&comp->state_lock);
   }
 
 done:
   GST_DEBUG_OBJECT (comp->parent, "Reconfigured port %u: %s (0x%08x)",
       port->index, gst_omx_error_to_string (err), err);
 
-  gst_omx_rec_mutex_unlock_for_recursion (&port->port_lock);
+  g_mutex_unlock (&comp->lock);
 
   return err;
 }
 
+/* NOTE: Uses comp->lock and comp->messages_lock */
 OMX_ERRORTYPE
 gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
 {
@@ -1824,12 +2009,14 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
 
   comp = port->comp;
 
+  g_mutex_lock (&comp->lock);
+
   GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u %s",
       port->index, (start ? "start" : "stsop"));
 
-  gst_omx_rec_mutex_lock (&port->port_lock);
+  gst_omx_component_handle_messages (comp);
 
-  if ((err = gst_omx_component_get_last_error (comp)) != OMX_ErrorNone)
+  if ((err = comp->last_error) != OMX_ErrorNone)
     goto done;
 
   if (start)
@@ -1841,7 +2028,6 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
     GList *l;
 
     if (start) {
-      gst_omx_rec_mutex_lock (&comp->state_lock);
       for (l = comp->pending_reconfigure_outports; l; l = l->next) {
         if (l->data == (gpointer) port)
           break;
@@ -1852,9 +2038,7 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
             g_list_prepend (comp->pending_reconfigure_outports, port);
         g_atomic_int_set (&comp->have_pending_reconfigure_outports, 1);
       }
-      gst_omx_rec_mutex_unlock (&comp->state_lock);
     } else {
-      gst_omx_rec_mutex_lock (&comp->state_lock);
       for (l = comp->pending_reconfigure_outports; l; l = l->next) {
         if (l->data == (gpointer) port) {
           comp->pending_reconfigure_outports =
@@ -1864,19 +2048,20 @@ gst_omx_port_manual_reconfigure (GstOMXPort * port, gboolean start)
       }
       if (!comp->pending_reconfigure_outports) {
         g_atomic_int_set (&comp->have_pending_reconfigure_outports, 0);
-        g_cond_broadcast (&comp->state_cond);
+        g_mutex_lock (&comp->messages_lock);
+        g_cond_broadcast (&comp->messages_cond);
+        g_mutex_unlock (&comp->messages_lock);
       }
-      gst_omx_rec_mutex_unlock (&comp->state_lock);
     }
   }
 
 
 done:
-  gst_omx_rec_mutex_unlock (&port->port_lock);
-
   GST_DEBUG_OBJECT (comp->parent, "Manual reconfigure of port %u: %s (0x%08x)",
       port->index, gst_omx_error_to_string (err), err);
 
+  g_mutex_unlock (&comp->lock);
+
   return err;
 }
 
index 3beb9e4..2f8347b 100644 (file)
@@ -1,6 +1,8 @@
 /*
  * Copyright (C) 2011, Hewlett-Packard Development Company, L.P.
  *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>, Collabora Ltd.
+ * Copyright (C) 2013, Collabora Ltd.
+ *   Author: Sebastian Dröge <sebastian.droege@collabora.co.uk>
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -52,8 +54,6 @@
 #pragma pack()
 #endif
 
-#include "gstomxrecmutex.h"
-
 G_BEGIN_DECLS
 
 #define GST_OMX_INIT_STRUCT(st) G_STMT_START { \
@@ -112,6 +112,7 @@ typedef enum _GstOMXPortDirection GstOMXPortDirection;
 typedef struct _GstOMXComponent GstOMXComponent;
 typedef struct _GstOMXBuffer GstOMXBuffer;
 typedef struct _GstOMXClassData GstOMXClassData;
+typedef struct _GstOMXMessage GstOMXMessage;
 
 typedef enum {
   /* Everything good and the buffer is valid */
@@ -137,8 +138,6 @@ struct _GstOMXCore {
   gint user_count; /* LOCK */
 
   /* OpenMAX core library functions, protected with LOCK */
-  /* FIXME: OpenMAX spec does not specify that this is required
-   * but gst-openmax does it */
   OMX_ERRORTYPE (*init) (void);
   OMX_ERRORTYPE (*deinit) (void);
   OMX_ERRORTYPE (*get_handle) (OMX_HANDLETYPE * handle,
@@ -146,32 +145,51 @@ struct _GstOMXCore {
   OMX_ERRORTYPE (*free_handle) (OMX_HANDLETYPE handle);
 };
 
+typedef enum {
+  GST_OMX_MESSAGE_STATE_SET,
+  GST_OMX_MESSAGE_FLUSH,
+  GST_OMX_MESSAGE_ERROR,
+  GST_OMX_MESSAGE_PORT_ENABLE,
+  GST_OMX_MESSAGE_PORT_SETTINGS_CHANGED,
+  GST_OMX_MESSAGE_BUFFER_DONE,
+} GstOMXMessageType;
+
+struct _GstOMXMessage {
+  GstOMXMessageType type;
+
+  union {
+    struct {
+      OMX_STATETYPE state;
+    } state_set;
+    struct {
+      OMX_U32 port;
+    } flush;
+    struct {
+      OMX_ERRORTYPE error;
+    } error;
+    struct {
+      OMX_U32 port;
+      OMX_BOOL enable;
+    } port_enable;
+    struct {
+      OMX_U32 port;
+    } port_settings_changed;
+    struct {
+      OMX_HANDLETYPE component;
+      OMX_PTR app_data;
+      OMX_BUFFERHEADERTYPE *buffer;
+      OMX_BOOL empty;
+    } buffer_done;
+  } content;
+};
+
 struct _GstOMXPort {
   GstOMXComponent *comp;
   guint32 index;
 
-  /* Protects port_def, buffers, pending_buffers,
-   * settings_changed, flushing, flushed, enabled_changed
-   * and settings_cookie.
-   *
-   * Signalled if pending_buffers gets a
-   * new buffer or flushing/flushed is set
-   * to TRUE or the port is enabled/disabled
-   * or the settings change or an error happens.
-   *
-   * Note: Always check comp->last_error before
-   * waiting and after being signalled!
-   *
-   * Note: flushed==TRUE implies flushing==TRUE!
-   *
-   * Note: This lock must always be taken before
-   * the component's state lock if both are needed!
-   */
-  GstOMXRecMutex port_lock;
-  GCond port_cond;
   OMX_PARAM_PORTDEFINITIONTYPE port_def;
   GPtrArray *buffers; /* Contains GstOMXBuffer* */
-  GQueue *pending_buffers; /* Contains GstOMXBuffer* */
+  GQueue pending_buffers; /* Contains GstOMXBuffer* */
   /* If TRUE we need to get the new caps of this port */
   gboolean settings_changed;
   gboolean flushing;
@@ -193,15 +211,20 @@ struct _GstOMXComponent {
 
   guint64 hacks; /* Flags, GST_OMX_HACK_* */
 
+  /* Added once, never changed. No locks necessary */
   GPtrArray *ports; /* Contains GstOMXPort* */
   gint n_in_ports, n_out_ports;
 
-  /* Protecting state, pending_state, last_error,
-   * pending_reconfigure_outports.
-   * Signalled if one of them changes
-   */
-  GstOMXRecMutex state_lock;
-  GCond state_cond;
+  /* Locking order: lock -> messages_lock
+   *
+   * Never hold lock while waiting for messages_cond
+   * Always check that messages is empty before waiting */
+  GMutex lock;
+
+  GQueue messages; /* Queue of GstOMXMessages */
+  GMutex messages_lock;
+  GCond messages_cond;
+
   OMX_STATETYPE state;
   /* OMX_StateInvalid if no pending state */
   OMX_STATETYPE pending_state;
index 0590a97..6b4a3d5 100644 (file)
@@ -901,10 +901,12 @@ gst_omx_audio_enc_sink_event (GstAudioEncoder * encoder, GstEvent * event)
       GST_WARNING_OBJECT (self, "Component does not support empty EOS buffers");
 
       /* Insert a NULL into the queue to signal EOS */
-      gst_omx_rec_mutex_lock (&self->out_port->port_lock);
-      g_queue_push_tail (self->out_port->pending_buffers, NULL);
-      g_cond_broadcast (&self->out_port->port_cond);
-      gst_omx_rec_mutex_unlock (&self->out_port->port_lock);
+      g_mutex_lock (&self->component->lock);
+      g_queue_push_tail (&self->out_port->pending_buffers, NULL);
+      g_mutex_unlock (&self->component->lock);
+      g_mutex_lock (&self->component->messages_lock);
+      g_cond_broadcast (&self->component->messages_cond);
+      g_mutex_unlock (&self->component->messages_lock);
       return TRUE;
     }
 
diff --git a/omx/gstomxrecmutex.c b/omx/gstomxrecmutex.c
deleted file mode 100644 (file)
index c12b3aa..0000000
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Copyright (C) 2012, Collabora Ltd.
- *   Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation
- * version 2.1 of the License.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
- *
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "gstomxrecmutex.h"
-
-void
-gst_omx_rec_mutex_init (GstOMXRecMutex * mutex)
-{
-  g_mutex_init (&mutex->lock);
-  g_mutex_init (&mutex->recursion_lock);
-  g_cond_init (&mutex->recursion_wait_cond);
-  g_atomic_int_set (&mutex->recursion_allowed, FALSE);
-  g_atomic_int_set (&mutex->recursion_pending, FALSE);
-}
-
-void
-gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex)
-{
-  g_mutex_clear (&mutex->lock);
-  g_mutex_clear (&mutex->recursion_lock);
-}
-
-void
-gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex)
-{
-  g_mutex_lock (&mutex->lock);
-}
-
-void
-gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex)
-{
-  g_mutex_unlock (&mutex->lock);
-}
-
-void
-gst_omx_rec_mutex_lock_for_recursion (GstOMXRecMutex * mutex)
-{
-  gboolean exchanged;
-
-  g_mutex_lock (&mutex->recursion_lock);
-
-  while (g_atomic_int_get (&mutex->recursion_allowed))
-    g_cond_wait (&mutex->recursion_wait_cond, &mutex->recursion_lock);
-
-  g_mutex_lock (&mutex->lock);
-  exchanged =
-      g_atomic_int_compare_and_exchange (&mutex->recursion_pending, FALSE,
-      TRUE);
-  g_assert (exchanged);
-}
-
-void
-gst_omx_rec_mutex_unlock_for_recursion (GstOMXRecMutex * mutex)
-{
-  gboolean exchanged;
-
-  exchanged =
-      g_atomic_int_compare_and_exchange (&mutex->recursion_pending, TRUE,
-      FALSE);
-  g_assert (exchanged);
-  g_mutex_unlock (&mutex->lock);
-  g_cond_broadcast (&mutex->recursion_wait_cond);
-  g_mutex_unlock (&mutex->recursion_lock);
-}
-
-/* must be called with mutex->lock taken */
-void
-gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex)
-{
-  gboolean exchanged;
-
-  exchanged =
-      g_atomic_int_compare_and_exchange (&mutex->recursion_allowed, FALSE,
-      TRUE);
-  g_assert (exchanged);
-  exchanged =
-      g_atomic_int_compare_and_exchange (&mutex->recursion_pending, TRUE,
-      FALSE);
-  g_assert (exchanged);
-  g_mutex_unlock (&mutex->recursion_lock);
-}
-
-/* must be called with mutex->lock taken */
-void
-gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex)
-{
-  gboolean exchanged;
-
-  g_mutex_lock (&mutex->recursion_lock);
-  exchanged =
-      g_atomic_int_compare_and_exchange (&mutex->recursion_allowed, TRUE,
-      FALSE);
-  g_assert (exchanged);
-  exchanged =
-      g_atomic_int_compare_and_exchange (&mutex->recursion_pending, FALSE,
-      TRUE);
-  g_assert (exchanged);
-}
-
-void
-gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex)
-{
-  g_mutex_lock (&mutex->recursion_lock);
-  if (!g_atomic_int_get (&mutex->recursion_allowed)) {
-    /* no recursion allowed, lock the proper mutex */
-    g_mutex_lock (&mutex->lock);
-    g_mutex_unlock (&mutex->recursion_lock);
-  }
-}
-
-void
-gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex)
-{
-  /* It is safe to check recursion_allowed here because
-   * we hold at least one of the two locks and
-   * either lock protects it from being changed.
-   */
-  if (g_atomic_int_get (&mutex->recursion_allowed)) {
-    g_mutex_unlock (&mutex->recursion_lock);
-  } else {
-    g_mutex_unlock (&mutex->lock);
-  }
-}
diff --git a/omx/gstomxrecmutex.h b/omx/gstomxrecmutex.h
deleted file mode 100644 (file)
index 4645302..0000000
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (C) 2012, Collabora Ltd.
- *   Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation
- * version 2.1 of the License.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
- *
- */
-
-#ifndef __GST_OMX_REC_MUTEX_H__
-#define __GST_OMX_REC_MUTEX_H__
-
-#include <glib.h>
-
-G_BEGIN_DECLS
-
-/*
- * This is a recursive mutex implementation that serves a very specific
- * purpose; it is used to allow OpenMAX callbacks to be run in the context
- * of some OpenMAX function call while the calling function is holding the
- * master lock.
- *
- * According to the OpenMAX specification, we have 2 possible ways that
- * callbacks may be called. First, we have out-of-context calls, which means
- * that callbacks are called from a different thread at any point in time.
- * In this case, callbacks must take the appropriate lock to protect the data
- * that they are changing. Second, we have in-context calls, which means
- * that callbacks are called when we call some OpenMAX function, before this
- * function returns. In this case, we need to allow the callback to run
- * without taking any locks that the caller of the OpenMAX function is holding.
- *
- * This can be solved by a recusrive mutex. However, a normal GRecMutex is
- * not enough because it allows being locked multiple times only from
- * the same thread. Unfortunatly, what we see in Broadcom's implementation,
- * for instance, OpenMAX callbacks may be in-context, but from a different
- * thread. This is achieved like this:
- *
- * - OMX_Foo is called
- * - OMX_Foo waits on a condition
- * - The callback is executed in a different thread
- * - When the callback returns, its calling function
- *   signals the condition that OMX_Foo waits on
- * - OMX_Foo wakes up and returns
- *
- * This recursive mutex implementation attempts to fix this problem.
- * This is achieved like this: All functions lock this mutex normally
- * using gst_omx_rec_mutex_lock() / _unlock(). These functions
- * effectively lock the master mutex and they are identical in behavior
- * with g_mutex_lock() / _unlock(). When a function that has already
- * locked this mutex is about to call some OpenMAX function, it must
- * call gst_omx_rec_mutex_begin_recursion() to indicate that recursive
- * locking is now allowed, and similarly, call gst_omx_rec_mutex_end_recursion()
- * after the OpenMAX function has returned to indicate that no recursive
- * locking is allowed from this point on. Callbacks should lock the
- * mutex using gst_omx_rec_mutex_recursive_lock() / _recursive_unlock().
- * These two functions, depending on whether recursion is allowed
- * will take/release either the master lock or the recursion_lock.
- * Effectively, this allows callbacks to run in the context any code between
- * calls to gst_omx_rec_mutex_begin_recursion() / _end_recursion().
- *
- * Note that this doesn't prevent out-of-context callback executions
- * to be run at that point, but due to the fact that _end_recursion()
- * also locks the recursion_lock, it is at least guaranteed that they
- * will have finished their execution before _end_recursion() returns.
- */
-typedef struct _GstOMXRecMutex GstOMXRecMutex;
-
-struct _GstOMXRecMutex {
-  /* The master lock */
-  GMutex lock;
-
-  /* This lock is taken when recursing.
-   * The master lock must always be taken before this one,
-   * by the caller of _begin_recursion().
-   */
-  GMutex recursion_lock;
-
-  /* Indicates whether recursion is allowed.
-   * When it is allowed, _recursive_lock() takes
-   * the recursion_lock instead of the master lock.
-   * This variable is protected by both locks.
-   */
-  volatile gint recursion_allowed;
-
-  /* Indicates whether lock is locked and recursion
-   * will be allowed soon
-   */
-  volatile gint recursion_pending;
-
-  GCond recursion_wait_cond;
-};
-
-void            gst_omx_rec_mutex_init (GstOMXRecMutex * mutex);
-void            gst_omx_rec_mutex_clear (GstOMXRecMutex * mutex);
-
-void            gst_omx_rec_mutex_lock (GstOMXRecMutex * mutex);
-void            gst_omx_rec_mutex_unlock (GstOMXRecMutex * mutex);
-
-void            gst_omx_rec_mutex_lock_for_recursion (GstOMXRecMutex * mutex);
-void            gst_omx_rec_mutex_unlock_for_recursion (GstOMXRecMutex * mutex);
-
-void            gst_omx_rec_mutex_begin_recursion (GstOMXRecMutex * mutex);
-void            gst_omx_rec_mutex_end_recursion (GstOMXRecMutex * mutex);
-
-void            gst_omx_rec_mutex_recursive_lock (GstOMXRecMutex * mutex);
-void            gst_omx_rec_mutex_recursive_unlock (GstOMXRecMutex * mutex);
-
-G_END_DECLS
-
-#endif /* __GST_OMX_REC_MUTEX_H__ */