examples: simplify the thread synchronization code
authorJosep Torra <n770galaxy@gmail.com>
Sat, 28 Sep 2013 11:32:37 +0000 (13:32 +0200)
committerJosep Torra <n770galaxy@gmail.com>
Sat, 28 Sep 2013 12:14:54 +0000 (14:14 +0200)
Make everithing more simple and fix the races conditions remaining in
the previous approaches.

examples/egl/testegl.c

index 7886483..fe6c883 100644 (file)
@@ -89,6 +89,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 #define M_PI 3.141592654
 #endif
 
+#define SYNC_BUFFERS TRUE
+
 #define TRACE_VC_MEMORY_ENABLED 0
 
 #if TRACE_VC_MEMORY_ENABLED
@@ -168,7 +170,6 @@ typedef struct
   /* Interthread comunication */
   GAsyncQueue *queue;
   GMutex *queue_lock;
-  GMutex *flow_lock;
   GCond *cond;
   gboolean flushing;
   GstMiniObject *popped_obj;
@@ -976,7 +977,6 @@ init_intercom (APP_STATE_T * state)
   state->queue =
       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
   state->queue_lock = g_mutex_new ();
-  state->flow_lock = g_mutex_new ();
   state->cond = g_cond_new ();
 }
 
@@ -992,10 +992,6 @@ terminate_intercom (APP_STATE_T * state)
     g_mutex_free (state->queue_lock);
   }
 
-  if (state->flow_lock) {
-    g_mutex_free (state->flow_lock);
-  }
-
   if (state->cond) {
     g_cond_free (state->cond);
   }
@@ -1020,7 +1016,6 @@ flush_start (APP_STATE_T * state)
   g_cond_broadcast (state->cond);
   g_mutex_unlock (state->queue_lock);
 
-  g_mutex_lock (state->flow_lock);
   while ((object = g_async_queue_try_pop (state->queue))) {
     gst_mini_object_unref (object);
   }
@@ -1028,7 +1023,6 @@ flush_start (APP_STATE_T * state)
   flush_internal (state);
   state->popped_obj = NULL;
   g_mutex_unlock (state->queue_lock);
-  g_mutex_unlock (state->flow_lock);
 }
 
 static void
@@ -1100,32 +1094,28 @@ static gboolean
 handle_queued_objects (APP_STATE_T * state)
 {
   GstMiniObject *object = NULL;
-  gboolean done = FALSE;
 
   g_mutex_lock (state->queue_lock);
   if (state->flushing) {
     g_cond_broadcast (state->cond);
-    done = TRUE;
+    goto beach;
   } else if (g_async_queue_length (state->queue) == 0) {
-    done = TRUE;
+    goto beach;
   }
-  g_mutex_unlock (state->queue_lock);
-
-  while (!done && (object = g_async_queue_try_pop (state->queue))) {
 
+  if ((object = g_async_queue_try_pop (state->queue))) {
     if (GST_IS_BUFFER (object)) {
       GstBuffer *buffer = GST_BUFFER_CAST (object);
-      g_mutex_lock (state->queue_lock);
       update_image (state, buffer);
       render_scene (state);
       gst_buffer_unref (buffer);
-      g_mutex_unlock (state->queue_lock);
+      if (!SYNC_BUFFERS) {
+        object = NULL;
+      }
     } else if (GST_IS_QUERY (object)) {
       GstQuery *query = GST_QUERY_CAST (object);
       GstStructure *s = (GstStructure *) gst_query_get_structure (query);
 
-      g_mutex_lock (state->queue_lock);
-
       if (gst_structure_has_name (s, "eglglessink-allocate-eglimage")) {
         GstBuffer *buffer;
         GstVideoFormat format;
@@ -1151,18 +1141,11 @@ handle_queued_objects (APP_STATE_T * state)
       } else {
         g_assert_not_reached ();
       }
-
-      state->popped_obj = object;
-      g_cond_broadcast (state->cond);
-      g_mutex_unlock (state->queue_lock);
-
-      return TRUE;
     } else if (GST_IS_EVENT (object)) {
       GstEvent *event = GST_EVENT_CAST (object);
       g_print ("\nevent %p %s\n", event,
           gst_event_type_get_name (GST_EVENT_TYPE (event)));
 
-      g_mutex_lock (state->queue_lock);
       switch (GST_EVENT_TYPE (event)) {
         case GST_EVENT_EOS:
           flush_internal (state);
@@ -1170,18 +1153,19 @@ handle_queued_objects (APP_STATE_T * state)
         default:
           break;
       }
-      g_mutex_unlock (state->queue_lock);
       gst_event_unref (event);
+      object = NULL;
     }
+  }
 
-    g_mutex_lock (state->queue_lock);
+  if (object) {
     state->popped_obj = object;
     g_cond_broadcast (state->cond);
-    g_mutex_unlock (state->queue_lock);
-    g_mutex_lock (state->flow_lock);
-    g_mutex_unlock (state->flow_lock);
   }
 
+beach:
+  g_mutex_unlock (state->queue_lock);
+
   return FALSE;
 }
 
@@ -1190,14 +1174,13 @@ queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
 {
   gboolean res = TRUE;
 
-  g_mutex_lock (state->flow_lock);
+  g_mutex_lock (state->queue_lock);
   if (state->flushing) {
     gst_mini_object_unref (obj);
     res = FALSE;
     goto beach;
   }
 
-  g_mutex_lock (state->queue_lock);
   g_async_queue_push (state->queue, obj);
 
   if (synchronous) {
@@ -1206,10 +1189,9 @@ queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
       g_cond_wait (state->cond, state->queue_lock);
     } while (!state->flushing && state->popped_obj != obj);
   }
-  g_mutex_unlock (state->queue_lock);
 
 beach:
-  g_mutex_unlock (state->flow_lock);
+  g_mutex_unlock (state->queue_lock);
   return res;
 }
 
@@ -1226,7 +1208,8 @@ buffers_cb (GstElement * fakesink, GstBuffer * buffer, GstPad * pad,
     gpointer user_data)
 {
   APP_STATE_T *state = (APP_STATE_T *) user_data;
-  queue_object (state, GST_MINI_OBJECT_CAST (gst_buffer_ref (buffer)), TRUE);
+  queue_object (state, GST_MINI_OBJECT_CAST (gst_buffer_ref (buffer)),
+      SYNC_BUFFERS);
 }
 
 static GstPadProbeReturn