examples: fix a race condition when seeking
authorJosep Torra <n770galaxy@gmail.com>
Fri, 20 Sep 2013 15:09:52 +0000 (17:09 +0200)
committerJosep Torra <n770galaxy@gmail.com>
Fri, 20 Sep 2013 16:50:26 +0000 (18:50 +0200)
Fixes a race condition that caused pipeline deadlock during seeks.

examples/egl/testegl.c

index fa43d88..b20c91e 100644 (file)
@@ -162,11 +162,13 @@ typedef struct
 
   /* GStreamer related resources */
   GstElement *pipeline;
+  GstElement *vsink;
   GstEGLDisplay *gst_display;
 
   /* Interthread comunication */
   GAsyncQueue *queue;
-  GMutex *lock;
+  GMutex *queue_lock;
+  GMutex *flow_lock;
   GCond *cond;
   gboolean flushing;
   GstMiniObject *popped_obj;
@@ -954,14 +956,11 @@ update_image (APP_STATE_T * state, GstBuffer * buffer)
 {
   GstMemory *mem = gst_buffer_peek_memory (buffer, 0);
 
-  g_mutex_lock (state->lock);
   if (state->current_mem) {
     gst_memory_unref (state->current_mem);
   }
   state->current_mem = gst_memory_ref (mem);
 
-  g_mutex_unlock (state->lock);
-
   TRACE_VC_MEMORY_ONCE_FOR_ID ("before glEGLImageTargetTexture2DOES", gid0);
 
   glBindTexture (GL_TEXTURE_2D, state->tex);
@@ -969,8 +968,6 @@ update_image (APP_STATE_T * state, GstBuffer * buffer)
       gst_egl_image_memory_get_image (mem));
 
   TRACE_VC_MEMORY_ONCE_FOR_ID ("after glEGLImageTargetTexture2DOES", gid1);
-
-  render_scene (state);
 }
 
 static void
@@ -978,7 +975,8 @@ init_intercom (APP_STATE_T * state)
 {
   state->queue =
       g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
-  state->lock = g_mutex_new ();
+  state->queue_lock = g_mutex_new ();
+  state->flow_lock = g_mutex_new ();
   state->cond = g_cond_new ();
 }
 
@@ -990,8 +988,12 @@ terminate_intercom (APP_STATE_T * state)
     g_async_queue_unref (state->queue);
   }
 
-  if (state->lock) {
-    g_mutex_free (state->lock);
+  if (state->queue_lock) {
+    g_mutex_free (state->queue_lock);
+  }
+
+  if (state->flow_lock) {
+    g_mutex_free (state->flow_lock);
   }
 
   if (state->cond) {
@@ -1013,33 +1015,47 @@ flush_start (APP_STATE_T * state)
 {
   GstMiniObject *object = NULL;
 
-  g_mutex_lock (state->lock);
+  g_mutex_lock (state->queue_lock);
   state->flushing = TRUE;
   g_cond_broadcast (state->cond);
-  g_mutex_unlock (state->lock);
+  g_mutex_unlock (state->queue_lock);
 
+  g_mutex_lock (state->flow_lock);
   while ((object = g_async_queue_try_pop (state->queue))) {
     gst_mini_object_unref (object);
   }
-
+  g_mutex_lock (state->queue_lock);
   flush_internal (state);
+  state->popped_obj = NULL;
+  g_mutex_unlock (state->queue_lock);
+  g_mutex_unlock (state->flow_lock);
 }
 
 static void
 flush_stop (APP_STATE_T * state)
 {
-  g_mutex_lock (state->lock);
+  GstMiniObject *object = NULL;
+
+  g_mutex_lock (state->queue_lock);
+  while ((object = GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
+    gst_mini_object_unref (object);
+  }
+  flush_internal (state);
   state->popped_obj = NULL;
   state->flushing = FALSE;
-  g_mutex_unlock (state->lock);
+  g_mutex_unlock (state->queue_lock);
 }
 
 static void
 pipeline_pause (APP_STATE_T * state)
 {
-  flush_start (state);
   gst_element_set_state (state->pipeline, GST_STATE_PAUSED);
-  flush_stop (state);
+}
+
+static void
+pipeline_play (APP_STATE_T * state)
+{
+  gst_element_set_state (state->pipeline, GST_STATE_PLAYING);
 }
 
 static gint64
@@ -1048,7 +1064,7 @@ pipeline_get_position (APP_STATE_T * state)
   gint64 position = -1;
 
   if (state->pipeline) {
-    gst_element_query_position (state->pipeline, GST_FORMAT_TIME, &position);
+    gst_element_query_position (state->vsink, GST_FORMAT_TIME, &position);
   }
 
   return position;
@@ -1074,7 +1090,7 @@ pipeline_seek (APP_STATE_T * state, gint64 position)
     event = gst_event_new_seek (1.0,
         GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT,
         GST_SEEK_TYPE_SET, position, GST_SEEK_TYPE_SET, GST_CLOCK_TIME_NONE);
-    if (!gst_element_send_event (state->pipeline, event)) {
+    if (!gst_element_send_event (state->vsink, event)) {
       g_print ("seek failed\n");
     }
   }
@@ -1084,32 +1100,33 @@ static gboolean
 handle_queued_objects (APP_STATE_T * state)
 {
   GstMiniObject *object = NULL;
+  gboolean done = FALSE;
 
   if (g_async_queue_length (state->queue) == 0) {
     return FALSE;
   }
 
-  while ((object = g_async_queue_try_pop (state->queue))) {
+  g_mutex_lock (state->queue_lock);
+  if (state->flushing) {
+    g_cond_broadcast (state->cond);
+    done = TRUE;
+  }
+  g_mutex_unlock (state->queue_lock);
 
-    g_mutex_lock (state->lock);
-    if (state->flushing) {
-      state->popped_obj = object;
-      gst_mini_object_unref (object);
-      g_cond_broadcast (state->cond);
-      g_mutex_unlock (state->lock);
-      continue;
-    }
-    g_mutex_unlock (state->lock);
+  while (!done && (object = g_async_queue_try_pop (state->queue))) {
 
     if (GST_IS_BUFFER (object)) {
       GstBuffer *buffer = GST_BUFFER_CAST (object);
+      g_mutex_lock (state->queue_lock);
       update_image (state, buffer);
+      render_scene (state);
       gst_buffer_unref (buffer);
+      g_mutex_unlock (state->queue_lock);
     } else if (GST_IS_QUERY (object)) {
       GstQuery *query = GST_QUERY_CAST (object);
       GstStructure *s = (GstStructure *) gst_query_get_structure (query);
 
-      g_mutex_lock (state->lock);
+      g_mutex_lock (state->queue_lock);
 
       if (gst_structure_has_name (s, "eglglessink-allocate-eglimage")) {
         GstBuffer *buffer;
@@ -1139,7 +1156,7 @@ handle_queued_objects (APP_STATE_T * state)
 
       state->popped_obj = object;
       g_cond_broadcast (state->cond);
-      g_mutex_unlock (state->lock);
+      g_mutex_unlock (state->queue_lock);
 
       return TRUE;
     } else if (GST_IS_EVENT (object)) {
@@ -1147,7 +1164,7 @@ handle_queued_objects (APP_STATE_T * state)
       g_print ("\nevent %p %s\n", event,
           gst_event_type_get_name (GST_EVENT_TYPE (event)));
 
-      g_mutex_lock (state->lock);
+      g_mutex_lock (state->queue_lock);
       switch (GST_EVENT_TYPE (event)) {
         case GST_EVENT_EOS:
           flush_internal (state);
@@ -1155,14 +1172,14 @@ handle_queued_objects (APP_STATE_T * state)
         default:
           break;
       }
-      g_mutex_unlock (state->lock);
+      g_mutex_unlock (state->queue_lock);
       gst_event_unref (event);
     }
 
-    g_mutex_lock (state->lock);
+    g_mutex_lock (state->queue_lock);
     state->popped_obj = object;
     g_cond_broadcast (state->cond);
-    g_mutex_unlock (state->lock);
+    g_mutex_unlock (state->queue_lock);
   }
 
   return FALSE;
@@ -1171,24 +1188,29 @@ handle_queued_objects (APP_STATE_T * state)
 static gboolean
 queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
 {
-  g_mutex_lock (state->lock);
+  gboolean res = TRUE;
+
+  g_mutex_lock (state->flow_lock);
   if (state->flushing) {
-    g_mutex_unlock (state->lock);
     gst_mini_object_unref (obj);
-    return FALSE;
+    res = FALSE;
+    goto beach;
   }
 
   g_async_queue_push (state->queue, obj);
 
   if (synchronous) {
     /* Waiting for object to be handled */
+    g_mutex_lock (state->queue_lock);
     do {
-      g_cond_wait (state->cond, state->lock);
+      g_cond_wait (state->cond, state->queue_lock);
     } while (!state->flushing && state->popped_obj != obj);
+    g_mutex_unlock (state->queue_lock);
   }
-  g_mutex_unlock (state->lock);
 
-  return TRUE;
+beach:
+  g_mutex_unlock (state->flow_lock);
+  return res;
 }
 
 static void
@@ -1261,9 +1283,9 @@ query_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
         return GST_PAD_PROBE_OK;
       }
 
-      g_mutex_lock (state->lock);
+      g_mutex_lock (state->queue_lock);
       pool = state->pool ? gst_object_ref (state->pool) : NULL;
-      g_mutex_unlock (state->lock);
+      g_mutex_unlock (state->queue_lock);
 
       if (pool) {
         GstCaps *pcaps;
@@ -1373,6 +1395,7 @@ init_playbin_player (APP_STATE_T * state, const gchar * uri)
       "video-sink", vsink, "flags",
       GST_PLAY_FLAG_NATIVE_VIDEO | GST_PLAY_FLAG_AUDIO, NULL);
 
+  state->vsink = gst_object_ref (vsink);
   return TRUE;
 }
 
@@ -1409,6 +1432,7 @@ init_parse_launch_player (APP_STATE_T * state, const gchar * spipeline)
   gst_pad_add_probe (gst_element_get_static_pad (vsink, "sink"),
       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, query_cb, state, NULL);
 
+  state->vsink = gst_object_ref (vsink);
   return TRUE;
 }
 
@@ -1502,7 +1526,7 @@ handle_keyboard (GIOChannel * source, GIOCondition cond, APP_STATE_T * state)
         pipeline_pause (state);
         break;
       case 'r':
-        gst_element_set_state (state->pipeline, GST_STATE_PLAYING);
+        pipeline_play (state);
         break;
       case 'l':
         report_position_duration (state);
@@ -1555,10 +1579,10 @@ buffering_cb (GstBus * bus, GstMessage * msg, APP_STATE_T * state)
   gst_message_parse_buffering (msg, &percent);
   g_print ("Buffering %3d%%\r", percent);
   if (percent < 100)
-    gst_element_set_state (state->pipeline, GST_STATE_PAUSED);
+    pipeline_pause (state);
   else {
     g_print ("\n");
-    gst_element_set_state (state->pipeline, GST_STATE_PLAYING);
+    pipeline_play (state);
   }
 }
 
@@ -1774,6 +1798,11 @@ done:
   /* Release pipeline */
   if (state->pipeline) {
     gst_element_set_state (state->pipeline, GST_STATE_NULL);
+    if (state->vsink) {
+      gst_object_unref (state->vsink);
+      state->vsink = NULL;
+    }
+
     gst_object_unref (state->pipeline);
   }