systemclock: Fix wait/unschedule race
authorStian Selnes <stian@pexip.com>
Wed, 10 Feb 2016 08:09:29 +0000 (09:09 +0100)
committerThiago Santos <thiagoss@osg.samsung.com>
Wed, 10 Feb 2016 11:59:33 +0000 (08:59 -0300)
Fixes a race where an entry is set to BUSY in
gst_system_clock_id_wait_jitter() and is UNSCHEDULED before
gst_system_clock_id_wait_jitter_unlocked() starts processing it. The
wakeup added by gst_system_clock_id_unschedule() must be cleaned up.

Two stress tests are added. One test that triggers the specific issue
described above. The second stresses the code path where a wait is
rescheduled because the poll returned early.

https://bugzilla.gnome.org/show_bug.cgi?id=761586

gst/gstsystemclock.c
tests/check/gst/gstsystemclock.c

index e65c80c..2f152ca 100644 (file)
@@ -702,8 +702,10 @@ gst_system_clock_id_wait_jitter_unlocked (GstClock * clock,
   GstClockReturn status;
 
   status = GET_ENTRY_STATUS (entry);
-  if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED))
+  if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
+    gst_system_clock_cleanup_unscheduled (sysclock, entry);
     return GST_CLOCK_UNSCHEDULED;
+  }
 
   /* need to call the overridden method because we want to sync against the time
    * of the clock, whatever the subclass uses as a clock. */
index a29bd1d..356a6e0 100644 (file)
@@ -752,6 +752,146 @@ GST_START_TEST (test_resolution)
 
 GST_END_TEST;
 
+typedef struct {
+  GThread *thread_wait;
+  GThread *thread_unschedule;
+  GMutex lock;
+  gboolean running;
+  GstClockID id;
+  gboolean unschedule;
+  gint32 time_offset_min;
+  gint32 time_offset_max;
+  gboolean dont_unschedule_positive_offset;
+} WaitUnscheduleData;
+
+static gpointer
+single_shot_wait_thread_func (gpointer data)
+{
+  WaitUnscheduleData *d = data;
+  GstClock *clock = gst_system_clock_obtain ();
+
+  while (d->running) {
+    GstClockTime now;
+    gint offset;
+    GstClockID id;
+
+    now = gst_clock_get_time (clock);
+    offset = g_random_int_range (d->time_offset_min, d->time_offset_max);
+
+    g_mutex_lock (&d->lock);
+    d->unschedule = d->dont_unschedule_positive_offset ? offset < 0 : TRUE;
+    id = d->id = gst_clock_new_single_shot_id (clock, now + (GstClockTime)offset);
+    g_mutex_unlock (&d->lock);
+
+    fail_unless (id != NULL, "Could not create single shot id");
+
+    gst_clock_id_wait (id, NULL);
+
+    g_mutex_lock (&d->lock);
+    gst_clock_id_unref (id);
+    d->id = NULL;
+    g_mutex_unlock (&d->lock);
+  }
+
+  g_object_unref (clock);
+
+  return NULL;
+}
+
+static gpointer
+unschedule_thread_func (gpointer data)
+{
+  WaitUnscheduleData *d = data;
+
+  while (d->running) {
+    g_mutex_lock (&d->lock);
+    if (d->id && d->unschedule) {
+      g_thread_yield ();
+      gst_clock_id_unschedule (d->id);
+    }
+    g_mutex_unlock (&d->lock);
+    g_thread_yield ();
+  }
+
+  return NULL;
+}
+
+GST_START_TEST (test_stress_cleanup_unschedule)
+{
+  WaitUnscheduleData data[50];
+
+  for (gint i = 0; i < G_N_ELEMENTS (data); i++) {
+    WaitUnscheduleData *d = &data[i];
+
+    /* Don't unschedule waits with positive offsets in order to trigger
+     * gst_system_clock_wait_wakeup() */
+    d->dont_unschedule_positive_offset = TRUE;
+    /* Overweight of negative offsets in order to trigger GST_CLOCK_EARLY more
+     * frequently */
+    d->time_offset_min = -GST_MSECOND;
+    d->time_offset_max = GST_MSECOND / 10;
+
+    /* Initialize test */
+    d->id = NULL;
+    d->running = TRUE;
+    g_mutex_init (&d->lock);
+    d->thread_wait = g_thread_new ("wait", single_shot_wait_thread_func, d);
+    d->thread_unschedule = g_thread_new ("unschedule", unschedule_thread_func,
+        d);
+  }
+
+  /* Test duration */
+  g_usleep (G_USEC_PER_SEC);
+
+  /* Stop and free test data */
+  for (gint i = 0; i < G_N_ELEMENTS (data); i++) {
+    WaitUnscheduleData *d = &data[i];
+    d->running = FALSE;
+    g_thread_join (d->thread_wait);
+    g_thread_join (d->thread_unschedule);
+    g_mutex_clear (&d->lock);
+  }
+}
+GST_END_TEST;
+
+
+GST_START_TEST (test_stress_reschedule)
+{
+  WaitUnscheduleData data[50];
+
+  for (gint i = 0; i < G_N_ELEMENTS (data); i++) {
+    WaitUnscheduleData *d = &data[i];
+
+    /* Try to unschedule all waits */
+    d->dont_unschedule_positive_offset = FALSE;
+    /* Small positive offsets in order to have both negative and positive
+     * diffs when a reschedule is needed. */
+    d->time_offset_min = 0;
+    d->time_offset_max = GST_MSECOND;
+
+    d->id = NULL;
+    d->running = TRUE;
+    g_mutex_init (&d->lock);
+    d->thread_wait = g_thread_new ("wait", single_shot_wait_thread_func, d);
+    d->thread_unschedule = g_thread_new ("unschedule", unschedule_thread_func,
+        d);
+  }
+
+  /* Test duration */
+  g_usleep (G_USEC_PER_SEC);
+
+  /* Stop and free test data */
+  for (gint i = 0; i < G_N_ELEMENTS (data); i++) {
+    WaitUnscheduleData *d = &data[i];
+    d->running = FALSE;
+    g_thread_join (d->thread_wait);
+    g_thread_join (d->thread_unschedule);
+    g_mutex_clear (&d->lock);
+  }
+}
+GST_END_TEST;
+
+
 static Suite *
 gst_systemclock_suite (void)
 {
@@ -772,6 +912,8 @@ gst_systemclock_suite (void)
   tcase_add_test (tc_chain, test_async_full);
   tcase_add_test (tc_chain, test_set_default);
   tcase_add_test (tc_chain, test_resolution);
+  tcase_add_test (tc_chain, test_stress_cleanup_unschedule);
+  tcase_add_test (tc_chain, test_stress_reschedule);
 
   return s;
 }