libgm2/libm2iso/RTco.cc (re-implementation) Bugfix for [PR108835]
authorGaius Mulley <gaiusmod2@gmail.com>
Sun, 19 Feb 2023 22:08:31 +0000 (22:08 +0000)
committerGaius Mulley <gaiusmod2@gmail.com>
Sun, 19 Feb 2023 22:08:31 +0000 (22:08 +0000)
This is a re-implementation of RTco.cc which fixes the race hazzard
seen occasionally when running testtransfer and coroutines from the
modula2 testsuite.

libgm2/ChangeLog:

PR testsuite/108835
* libm2iso/RTco.cc: Re-implementation using a single lock
mutex and inlined wait/signal implementation within
transfer.

Signed-off-by: Gaius Mulley <gaiusmod2@gmail.com>
libgm2/libm2iso/RTco.cc

index 0a0f1c03a99a8a73afcc5c24c52b5f1a34c0404d..8b8a4dcea6dd153c4b9ea63964947b95d3f2d027 100644 (file)
@@ -1,4 +1,4 @@
-/* RTco.c provides minimal access to thread primitives.
+/* RTco.cc provides minimal access to thread primitives.
 
 Copyright (C) 2019-2022 Free Software Foundation, Inc.
 Contributed by Gaius Mulley <gaius.mulley@southwales.ac.uk>.
@@ -30,8 +30,19 @@ see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
 #include <sys/select.h>
 #include <stdlib.h>
 #include <m2rts.h>
+#include <cstdio>
 
-// #define TRACEON
+#define EXPORT(FUNC) RTco_ ## FUNC
+#define M2EXPORT(FUNC) _M2_RTco_ ## FUNC
+
+/* This implementation of RTco.cc uses a single lock for mutex across
+   the whole module.  It also forces context switching between threads
+   in transfer by combining an implementation of wait and signal.
+
+   All semaphores are implemented using the same mutex lock and
+   separate condition variables.  */
+
+#undef TRACEON
 
 #define POOL
 #define SEM_POOL 10000
@@ -63,19 +74,22 @@ see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
 #define tprintf(...)
 #endif
 
+
 typedef struct threadCB_s
 {
   void (*proc) (void);
-  int execution;
   pthread_t p;
-  int tid;
+  int tid;   /* The thread id.  */
   unsigned int interruptLevel;
+  __gthread_cond_t run_counter;  /* Used to block the thread and force
+                                   a context switch.  */
+  int value;    /* Count 0 or 1.  */
+  bool waiting; /* Is this thread waiting on the run_counter?  */
 } threadCB;
 
 
 typedef struct threadSem_s
 {
-  __gthread_mutex_t mutex;
   __gthread_cond_t counter;
   int waiting;
   int sem_value;
@@ -87,25 +101,27 @@ static unsigned int nSemaphores = 0;
 static threadSem **semArray = NULL;
 
 /* These are used to lock the above module data structures.  */
-static threadSem lock;
+static __gthread_mutex_t lock;  /* This is the only mutex for
+                                  the whole module.  */
 static int initialized = FALSE;
+static int currentThread = 0;
 
 
-extern "C" int RTco_init (void);
+extern "C" int EXPORT(init) (void);
 
 
 extern "C" void
-_M2_RTco_dep (void)
+M2EXPORT(dep) (void)
 {
 }
 
 extern "C" void
-_M2_RTco_init (int argc, char *argv[], char *envp[])
+M2EXPORT(init) (int argc, char *argv[], char *envp[])
 {
 }
 
 extern "C" void
-_M2_RTco_fini (int argc, char *argv[], char *envp[])
+M2EXPORT(fini) (int argc, char *argv[], char *envp[])
 {
 }
 
@@ -114,7 +130,6 @@ static void
 initSem (threadSem *sem, int value)
 {
   __GTHREAD_COND_INIT_FUNCTION (&sem->counter);
-  __GTHREAD_MUTEX_INIT_FUNCTION (&sem->mutex);
   sem->waiting = FALSE;
   sem->sem_value = value;
 }
@@ -122,43 +137,41 @@ initSem (threadSem *sem, int value)
 static void
 waitSem (threadSem *sem)
 {
-  __gthread_mutex_lock (&sem->mutex);
+  __gthread_mutex_lock (&lock);
   if (sem->sem_value == 0)
     {
       sem->waiting = TRUE;
-      __gthread_cond_wait (&sem->counter, &sem->mutex);
+      __gthread_cond_wait (&sem->counter, &lock);
       sem->waiting = FALSE;
     }
   else
     sem->sem_value--;
-  __gthread_mutex_unlock (&sem->mutex);
+  __gthread_mutex_unlock (&lock);
 }
 
 static void
 signalSem (threadSem *sem)
 {
-  __gthread_mutex_unlock (&sem->mutex);
+  __gthread_mutex_lock (&lock);
   if (sem->waiting)
     __gthread_cond_signal (&sem->counter);
   else
     sem->sem_value++;
-  __gthread_mutex_unlock (&sem->mutex);
+  __gthread_mutex_unlock (&lock);
 }
 
-void stop (void) {}
-
 extern "C" void
-RTco_wait (int sid)
+EXPORT(wait) (int sid)
 {
-  RTco_init ();
+  EXPORT(init) ();
   tprintf ("wait %d\n", sid);
   waitSem (semArray[sid]);
 }
 
 extern "C" void
-RTco_signal (int sid)
+EXPORT(signal) (int sid)
 {
-  RTco_init ();
+  EXPORT(init) ();
   tprintf ("signal %d\n", sid);
   signalSem (semArray[sid]);
 }
@@ -207,90 +220,58 @@ initSemaphore (int value)
 }
 
 extern "C" int
-RTco_initSemaphore (int value)
+EXPORT(initSemaphore) (int value)
 {
   int sid;
 
-  RTco_init ();
-  waitSem (&lock);
+  tprintf ("initSemaphore (%d) called\n", value);
+  EXPORT(init) ();
+  tprintf ("about to access lock\n");
+  __gthread_mutex_lock (&lock);
   sid = initSemaphore (value);
-  signalSem (&lock);
+  __gthread_mutex_unlock (&lock);
   return sid;
 }
 
-/* signalThread signal the semaphore associated with thread tid.  */
-
-extern "C" void
-RTco_signalThread (int tid)
-{
-  int sem;
-  RTco_init ();
-  tprintf ("signalThread %d\n", tid);
-  waitSem (&lock);
-  sem = threadArray[tid].execution;
-  signalSem (&lock);
-  RTco_signal (sem);
-}
-
-/* waitThread wait on the semaphore associated with thread tid.  */
-
-extern "C" void
-RTco_waitThread (int tid)
-{
-  RTco_init ();
-  tprintf ("waitThread %d\n", tid);
-  RTco_wait (threadArray[tid].execution);
-}
-
-extern "C" int
-currentThread (void)
-{
-  int tid;
-
-  for (tid = 0; tid < nThreads; tid++)
-    if (pthread_self () == threadArray[tid].p)
-      return tid;
-  M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
-              "failed to find currentThread");
-}
-
 extern "C" int
-RTco_currentThread (void)
+EXPORT(currentThread) (void)
 {
   int tid;
 
-  RTco_init ();
-  waitSem (&lock);
-  tid = currentThread ();
+  EXPORT(init) ();
+  __gthread_mutex_lock (&lock);
+  tid = currentThread;
   tprintf ("currentThread %d\n", tid);
-  signalSem (&lock);
+  __gthread_mutex_unlock (&lock);
   return tid;
 }
 
 /* currentInterruptLevel returns the interrupt level of the current thread.  */
 
 extern "C" unsigned int
-RTco_currentInterruptLevel (void)
+EXPORT(currentInterruptLevel) (void)
 {
-  RTco_init ();
+  EXPORT(init) ();
+  __gthread_mutex_lock (&lock);
   tprintf ("currentInterruptLevel %d\n",
-           threadArray[RTco_currentThread ()].interruptLevel);
-  return threadArray[RTco_currentThread ()].interruptLevel;
+           threadArray[currentThread].interruptLevel);
+  int level = threadArray[currentThread].interruptLevel;
+  __gthread_mutex_unlock (&lock);
+  return level;
 }
 
 /* turninterrupts returns the old interrupt level and assigns the
    interrupt level to newLevel.  */
 
 extern "C" unsigned int
-RTco_turnInterrupts (unsigned int newLevel)
+EXPORT(turnInterrupts) (unsigned int newLevel)
 {
-  int tid = RTco_currentThread ();
-  unsigned int old = RTco_currentInterruptLevel ();
-
+  EXPORT(init) ();
+  __gthread_mutex_lock (&lock);
+  unsigned int old = threadArray[currentThread].interruptLevel;
   tprintf ("turnInterrupts from %d to %d\n", old, newLevel);
-  waitSem (&lock);
-  threadArray[tid].interruptLevel = newLevel;
-  signalSem (&lock);
+  threadArray[currentThread].interruptLevel = newLevel;
+  __gthread_mutex_unlock (&lock);
   return old;
 }
 
@@ -306,12 +287,30 @@ execThread (void *t)
 {
   threadCB *tp = (threadCB *)t;
 
+  tprintf ("exec thread tid = %d coming to life\n", tp->tid);
+  __gthread_mutex_lock (&lock);
   tprintf ("exec thread tid = %d  function = 0x%p  arg = 0x%p\n", tp->tid,
            tp->proc, t);
-  RTco_waitThread (
-      tp->tid); /* Forcing this thread to block, waiting to be scheduled.  */
-  tprintf ("  exec thread [%d]  function = 0x%p  arg = 0x%p\n", tp->tid,
+  /* Has the thread been signalled?  */
+  if (tp->value == 0)
+    {
+      /* Not been signalled therefore we force ourselves to block.  */
+      tprintf ("%s: forcing thread tid = %d to wait\n",
+              __FUNCTION__, tp->tid);
+      tp->waiting = true;  /* We are waiting.  */
+      __gthread_cond_wait (&tp->run_counter, &lock);
+      tp->waiting = false; /* Running again.  */
+    }
+  else
+    {
+      /* Yes signalled, therefore just take the recorded signal and continue.  */
+      tprintf ("%s: no need for thread tid = %d to wait\n",
+              __FUNCTION__, tp->tid);
+      tp->value--;
+    }
+  tprintf ("  running exec thread [%d]  function = 0x%p  arg = 0x%p\n", tp->tid,
            tp->proc, t);
+  __gthread_mutex_unlock (&lock);
   tp->proc (); /* Now execute user procedure.  */
 #if 0
   M2RTS_CoroutineException ( __FILE__, __LINE__, __COLUMN__, __FUNCTION__, "coroutine finishing");
@@ -356,21 +355,24 @@ initThread (void (*proc) (void), unsigned int stackSize,
 
   threadArray[tid].proc = proc;
   threadArray[tid].tid = tid;
-  threadArray[tid].execution = initSemaphore (0);
+  /* Initialize the thread run_counter used to force a context switch.  */
+  __GTHREAD_COND_INIT_FUNCTION (&threadArray[tid].run_counter);
   threadArray[tid].interruptLevel = interrupt;
+  threadArray[tid].waiting = false;     /* The thread is running.  */
+  threadArray[tid].value = 0;  /* No signal has been seen yet.  */
 
-  /* set thread creation attributes.  */
+  /* Set thread creation attributes.  */
   result = pthread_attr_init (&attr);
   if (result != 0)
     M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
-                "failed to create thread attribute");
+                "failed to create thread attribute");
 
   if (stackSize > 0)
     {
       result = pthread_attr_setstacksize (&attr, stackSize);
       if (result != 0)
         M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
-                    "failed to set stack size attribute");
+                    "failed to set stack size attribute");
     }
 
   tprintf ("initThread [%d]  function = 0x%p  (arg = 0x%p)\n", tid, proc,
@@ -385,85 +387,132 @@ initThread (void (*proc) (void), unsigned int stackSize,
 }
 
 extern "C" int
-RTco_initThread (void (*proc) (void), unsigned int stackSize,
-                 unsigned int interrupt)
+EXPORT(initThread) (void (*proc) (void), unsigned int stackSize,
+                   unsigned int interrupt)
 {
   int tid;
 
-  RTco_init ();
-  waitSem (&lock);
+  EXPORT(init) ();
+  __gthread_mutex_lock (&lock);
   tid = initThread (proc, stackSize, interrupt);
-  signalSem (&lock);
+  __gthread_mutex_unlock (&lock);
   return tid;
 }
 
 /* transfer unlocks thread p2 and locks the current thread.  p1 is
-   updated with the current thread id.  */
+   updated with the current thread id.
+   The implementation of transfer uses a combined wait/signal.  */
 
 extern "C" void
-RTco_transfer (int *p1, int p2)
+EXPORT(transfer) (int *p1, int p2)
 {
-  int tid = currentThread ();
-
-  if (!initialized)
-    M2RTS_HaltC (
-        __FILE__, __LINE__, __FUNCTION__,
-        "cannot transfer to a process before the process has been created");
-  if (tid == p2)
-    {
-      /* error.  */
+  __gthread_mutex_lock (&lock);
+  {
+    if (!initialized)
       M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
-                  "attempting to transfer to ourself");
-    }
-  else
-    {
-      *p1 = tid;
-      tprintf ("start, context switching from: %d to %d\n", tid, p2);
-      RTco_signalThread (p2);
-      RTco_waitThread (tid);
-      tprintf ("end, context back to %d\n", tid);
+                  "cannot transfer to a process before the process has been created");
+    if (currentThread == p2)
+      {
+       /* Error.  */
+       M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
+                    "attempting to transfer to ourself");
     }
+    else
+      {
+       *p1 = currentThread;
+       int old = currentThread;
+       tprintf ("start, context switching from: %d to %d\n", currentThread, p2);
+       /* Perform signal (p2 sem).  Without the mutex lock as we have
+          already obtained it above.  */
+       if (threadArray[p2].waiting)
+         {
+           /* p2 is blocked on the condition variable, release it.  */
+           tprintf ("p1 = %d cond_signal to p2 (%d)\n", currentThread, p2);
+         __gthread_cond_signal (&threadArray[p2].run_counter);
+         tprintf ("after p1 = %d cond_signal to p2 (%d)\n", currentThread, p2);
+         }
+       else
+         {
+           /* p2 hasn't reached the condition variable, so bump value
+              ready for p2 to test.  */
+           tprintf ("no need for thread %d to cond_signal - bump %d value (pre) = %d\n",
+                    currentThread, p2, threadArray[p2].value);
+           threadArray[p2].value++;
+         }
+       /* Perform wait (old sem).  Again without obtaining mutex as
+          we've already claimed it.  */
+       if (threadArray[old].value == 0)
+         {
+           currentThread = p2;
+           /* Record we are about to wait on the condition variable.  */
+           threadArray[old].waiting = true;
+           __gthread_cond_wait (&threadArray[old].run_counter, &lock);
+           threadArray[old].waiting = false;
+           /* We are running again.  */
+           currentThread = old;
+         }
+       else
+         {
+           tprintf ("(currentThread = %d) no need for thread %d to cond_wait - taking value (pre) = %d\n",
+                    currentThread, old, threadArray[old].value);
+           /* No need to block as we have been told a signal has
+               effectively already been recorded.  We remove the signal
+               notification without blocking.  */
+           threadArray[old].value--;
+         }
+       tprintf ("end, context back to %d\n", currentThread);
+       if (currentThread != old)
+         M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
+                      "wrong process id");
+      }
+  }
+  __gthread_mutex_unlock (&lock);
 }
 
 extern "C" int
-RTco_select (int p1, fd_set *p2, fd_set *p3, fd_set *p4, const timespec *p5)
+EXPORT(select) (int p1, fd_set *p2, fd_set *p3, fd_set *p4, const timespec *p5)
 {
-  RTco_init ();
+  EXPORT(init) ();
   tprintf ("[%x]  RTco.select (...)\n", pthread_self ());
   return pselect (p1, p2, p3, p4, p5, NULL);
 }
 
 extern "C" int
-RTco_init (void)
+EXPORT(init) (void)
 {
+  tprintf ("checking init\n");
   if (! initialized)
     {
-      int tid;
+      initialized = TRUE;
 
       tprintf ("RTco initialized\n");
-      initSem (&lock, 0);
+      __GTHREAD_MUTEX_INIT_FUNCTION (&lock);
+      __gthread_mutex_lock (&lock);
       /* Create initial thread container.  */
 #if defined(POOL)
       threadArray = (threadCB *)malloc (sizeof (threadCB) * THREAD_POOL);
       semArray = (threadSem **)malloc (sizeof (threadSem *) * SEM_POOL);
 #endif
-      tid = newThread ();  /* For the current initial thread.  */
-      threadArray[tid].tid = tid;
-      threadArray[tid].execution = initSemaphore (0);
-      threadArray[tid].p = pthread_self ();
-      threadArray[tid].interruptLevel = 0;
-      threadArray[tid].proc
-          = never;  /* This shouldn't happen as we are already running.  */
-      initialized = TRUE;
+      /* Create a thread control block for the main program (or process).  */
+      currentThread = newThread ();  /* For the current initial thread.  */
+      threadArray[currentThread].p = pthread_self ();
+      threadArray[currentThread].tid = currentThread;
+      __GTHREAD_COND_INIT_FUNCTION (&threadArray[currentThread].run_counter);
+      threadArray[currentThread].interruptLevel = 0;
+      /* The line below shouldn't be necessary as we are already running.  */
+      threadArray[currentThread].proc = never;
+      threadArray[currentThread].waiting = false;   /* We are running.  */
+      threadArray[currentThread].value = 0;   /* No signal from anyone yet.  */
       tprintf ("RTco initialized completed\n");
-      signalSem (&lock);
+      __gthread_mutex_unlock (&lock);
     }
   return 0;
 }
 
 extern "C" void __attribute__((__constructor__))
-_M2_RTco_ctor (void)
+M2EXPORT(ctor) (void)
 {
-  M2RTS_RegisterModule ("RTco", _M2_RTco_init, _M2_RTco_fini,
-                       _M2_RTco_dep);
+  M2RTS_RegisterModule ("RTco",
+                       M2EXPORT(init), M2EXPORT(fini),
+                       M2EXPORT(dep));
 }