-/* RTco.c provides minimal access to thread primitives.
+/* RTco.cc provides minimal access to thread primitives.
Copyright (C) 2019-2022 Free Software Foundation, Inc.
Contributed by Gaius Mulley <gaius.mulley@southwales.ac.uk>.
#include <sys/select.h>
#include <stdlib.h>
#include <m2rts.h>
+#include <cstdio>
-// #define TRACEON
+#define EXPORT(FUNC) RTco_ ## FUNC
+#define M2EXPORT(FUNC) _M2_RTco_ ## FUNC
+
+/* This implementation of RTco.cc uses a single lock for mutex across
+ the whole module. It also forces context switching between threads
+ in transfer by combining an implementation of wait and signal.
+
+ All semaphores are implemented using the same mutex lock and
+ separate condition variables. */
+
+#undef TRACEON
#define POOL
#define SEM_POOL 10000
#define tprintf(...)
#endif
+
typedef struct threadCB_s
{
void (*proc) (void);
- int execution;
pthread_t p;
- int tid;
+ int tid; /* The thread id. */
unsigned int interruptLevel;
+ __gthread_cond_t run_counter; /* Used to block the thread and force
+ a context switch. */
+ int value; /* Count 0 or 1. */
+ bool waiting; /* Is this thread waiting on the run_counter? */
} threadCB;
typedef struct threadSem_s
{
- __gthread_mutex_t mutex;
__gthread_cond_t counter;
int waiting;
int sem_value;
static threadSem **semArray = NULL;
/* These are used to lock the above module data structures. */
-static threadSem lock;
+static __gthread_mutex_t lock; /* This is the only mutex for
+ the whole module. */
static int initialized = FALSE;
+static int currentThread = 0;
-extern "C" int RTco_init (void);
+extern "C" int EXPORT(init) (void);
extern "C" void
-_M2_RTco_dep (void)
+M2EXPORT(dep) (void)
{
}
extern "C" void
-_M2_RTco_init (int argc, char *argv[], char *envp[])
+M2EXPORT(init) (int argc, char *argv[], char *envp[])
{
}
extern "C" void
-_M2_RTco_fini (int argc, char *argv[], char *envp[])
+M2EXPORT(fini) (int argc, char *argv[], char *envp[])
{
}
initSem (threadSem *sem, int value)
{
__GTHREAD_COND_INIT_FUNCTION (&sem->counter);
- __GTHREAD_MUTEX_INIT_FUNCTION (&sem->mutex);
sem->waiting = FALSE;
sem->sem_value = value;
}
static void
waitSem (threadSem *sem)
{
- __gthread_mutex_lock (&sem->mutex);
+ __gthread_mutex_lock (&lock);
if (sem->sem_value == 0)
{
sem->waiting = TRUE;
- __gthread_cond_wait (&sem->counter, &sem->mutex);
+ __gthread_cond_wait (&sem->counter, &lock);
sem->waiting = FALSE;
}
else
sem->sem_value--;
- __gthread_mutex_unlock (&sem->mutex);
+ __gthread_mutex_unlock (&lock);
}
static void
signalSem (threadSem *sem)
{
- __gthread_mutex_unlock (&sem->mutex);
+ __gthread_mutex_lock (&lock);
if (sem->waiting)
__gthread_cond_signal (&sem->counter);
else
sem->sem_value++;
- __gthread_mutex_unlock (&sem->mutex);
+ __gthread_mutex_unlock (&lock);
}
-void stop (void) {}
-
extern "C" void
-RTco_wait (int sid)
+EXPORT(wait) (int sid)
{
- RTco_init ();
+ EXPORT(init) ();
tprintf ("wait %d\n", sid);
waitSem (semArray[sid]);
}
extern "C" void
-RTco_signal (int sid)
+EXPORT(signal) (int sid)
{
- RTco_init ();
+ EXPORT(init) ();
tprintf ("signal %d\n", sid);
signalSem (semArray[sid]);
}
}
extern "C" int
-RTco_initSemaphore (int value)
+EXPORT(initSemaphore) (int value)
{
int sid;
- RTco_init ();
- waitSem (&lock);
+ tprintf ("initSemaphore (%d) called\n", value);
+ EXPORT(init) ();
+ tprintf ("about to access lock\n");
+ __gthread_mutex_lock (&lock);
sid = initSemaphore (value);
- signalSem (&lock);
+ __gthread_mutex_unlock (&lock);
return sid;
}
-/* signalThread signal the semaphore associated with thread tid. */
-
-extern "C" void
-RTco_signalThread (int tid)
-{
- int sem;
- RTco_init ();
- tprintf ("signalThread %d\n", tid);
- waitSem (&lock);
- sem = threadArray[tid].execution;
- signalSem (&lock);
- RTco_signal (sem);
-}
-
-/* waitThread wait on the semaphore associated with thread tid. */
-
-extern "C" void
-RTco_waitThread (int tid)
-{
- RTco_init ();
- tprintf ("waitThread %d\n", tid);
- RTco_wait (threadArray[tid].execution);
-}
-
-extern "C" int
-currentThread (void)
-{
- int tid;
-
- for (tid = 0; tid < nThreads; tid++)
- if (pthread_self () == threadArray[tid].p)
- return tid;
- M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
- "failed to find currentThread");
-}
-
extern "C" int
-RTco_currentThread (void)
+EXPORT(currentThread) (void)
{
int tid;
- RTco_init ();
- waitSem (&lock);
- tid = currentThread ();
+ EXPORT(init) ();
+ __gthread_mutex_lock (&lock);
+ tid = currentThread;
tprintf ("currentThread %d\n", tid);
- signalSem (&lock);
+ __gthread_mutex_unlock (&lock);
return tid;
}
/* currentInterruptLevel returns the interrupt level of the current thread. */
extern "C" unsigned int
-RTco_currentInterruptLevel (void)
+EXPORT(currentInterruptLevel) (void)
{
- RTco_init ();
+ EXPORT(init) ();
+ __gthread_mutex_lock (&lock);
tprintf ("currentInterruptLevel %d\n",
- threadArray[RTco_currentThread ()].interruptLevel);
- return threadArray[RTco_currentThread ()].interruptLevel;
+ threadArray[currentThread].interruptLevel);
+ int level = threadArray[currentThread].interruptLevel;
+ __gthread_mutex_unlock (&lock);
+ return level;
}
/* turninterrupts returns the old interrupt level and assigns the
interrupt level to newLevel. */
extern "C" unsigned int
-RTco_turnInterrupts (unsigned int newLevel)
+EXPORT(turnInterrupts) (unsigned int newLevel)
{
- int tid = RTco_currentThread ();
- unsigned int old = RTco_currentInterruptLevel ();
-
+ EXPORT(init) ();
+ __gthread_mutex_lock (&lock);
+ unsigned int old = threadArray[currentThread].interruptLevel;
tprintf ("turnInterrupts from %d to %d\n", old, newLevel);
- waitSem (&lock);
- threadArray[tid].interruptLevel = newLevel;
- signalSem (&lock);
+ threadArray[currentThread].interruptLevel = newLevel;
+ __gthread_mutex_unlock (&lock);
return old;
}
{
threadCB *tp = (threadCB *)t;
+ tprintf ("exec thread tid = %d coming to life\n", tp->tid);
+ __gthread_mutex_lock (&lock);
tprintf ("exec thread tid = %d function = 0x%p arg = 0x%p\n", tp->tid,
tp->proc, t);
- RTco_waitThread (
- tp->tid); /* Forcing this thread to block, waiting to be scheduled. */
- tprintf (" exec thread [%d] function = 0x%p arg = 0x%p\n", tp->tid,
+ /* Has the thread been signalled? */
+ if (tp->value == 0)
+ {
+ /* Not been signalled therefore we force ourselves to block. */
+ tprintf ("%s: forcing thread tid = %d to wait\n",
+ __FUNCTION__, tp->tid);
+ tp->waiting = true; /* We are waiting. */
+ __gthread_cond_wait (&tp->run_counter, &lock);
+ tp->waiting = false; /* Running again. */
+ }
+ else
+ {
+ /* Yes signalled, therefore just take the recorded signal and continue. */
+ tprintf ("%s: no need for thread tid = %d to wait\n",
+ __FUNCTION__, tp->tid);
+ tp->value--;
+ }
+ tprintf (" running exec thread [%d] function = 0x%p arg = 0x%p\n", tp->tid,
tp->proc, t);
+ __gthread_mutex_unlock (&lock);
tp->proc (); /* Now execute user procedure. */
#if 0
M2RTS_CoroutineException ( __FILE__, __LINE__, __COLUMN__, __FUNCTION__, "coroutine finishing");
threadArray[tid].proc = proc;
threadArray[tid].tid = tid;
- threadArray[tid].execution = initSemaphore (0);
+ /* Initialize the thread run_counter used to force a context switch. */
+ __GTHREAD_COND_INIT_FUNCTION (&threadArray[tid].run_counter);
threadArray[tid].interruptLevel = interrupt;
+ threadArray[tid].waiting = false; /* The thread is running. */
+ threadArray[tid].value = 0; /* No signal has been seen yet. */
- /* set thread creation attributes. */
+ /* Set thread creation attributes. */
result = pthread_attr_init (&attr);
if (result != 0)
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
- "failed to create thread attribute");
+ "failed to create thread attribute");
if (stackSize > 0)
{
result = pthread_attr_setstacksize (&attr, stackSize);
if (result != 0)
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
- "failed to set stack size attribute");
+ "failed to set stack size attribute");
}
tprintf ("initThread [%d] function = 0x%p (arg = 0x%p)\n", tid, proc,
}
extern "C" int
-RTco_initThread (void (*proc) (void), unsigned int stackSize,
- unsigned int interrupt)
+EXPORT(initThread) (void (*proc) (void), unsigned int stackSize,
+ unsigned int interrupt)
{
int tid;
- RTco_init ();
- waitSem (&lock);
+ EXPORT(init) ();
+ __gthread_mutex_lock (&lock);
tid = initThread (proc, stackSize, interrupt);
- signalSem (&lock);
+ __gthread_mutex_unlock (&lock);
return tid;
}
/* transfer unlocks thread p2 and locks the current thread. p1 is
- updated with the current thread id. */
+ updated with the current thread id.
+ The implementation of transfer uses a combined wait/signal. */
extern "C" void
-RTco_transfer (int *p1, int p2)
+EXPORT(transfer) (int *p1, int p2)
{
- int tid = currentThread ();
-
- if (!initialized)
- M2RTS_HaltC (
- __FILE__, __LINE__, __FUNCTION__,
- "cannot transfer to a process before the process has been created");
- if (tid == p2)
- {
- /* error. */
+ __gthread_mutex_lock (&lock);
+ {
+ if (!initialized)
M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
- "attempting to transfer to ourself");
- }
- else
- {
- *p1 = tid;
- tprintf ("start, context switching from: %d to %d\n", tid, p2);
- RTco_signalThread (p2);
- RTco_waitThread (tid);
- tprintf ("end, context back to %d\n", tid);
+ "cannot transfer to a process before the process has been created");
+ if (currentThread == p2)
+ {
+ /* Error. */
+ M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
+ "attempting to transfer to ourself");
}
+ else
+ {
+ *p1 = currentThread;
+ int old = currentThread;
+ tprintf ("start, context switching from: %d to %d\n", currentThread, p2);
+ /* Perform signal (p2 sem). Without the mutex lock as we have
+ already obtained it above. */
+ if (threadArray[p2].waiting)
+ {
+ /* p2 is blocked on the condition variable, release it. */
+ tprintf ("p1 = %d cond_signal to p2 (%d)\n", currentThread, p2);
+ __gthread_cond_signal (&threadArray[p2].run_counter);
+ tprintf ("after p1 = %d cond_signal to p2 (%d)\n", currentThread, p2);
+ }
+ else
+ {
+ /* p2 hasn't reached the condition variable, so bump value
+ ready for p2 to test. */
+ tprintf ("no need for thread %d to cond_signal - bump %d value (pre) = %d\n",
+ currentThread, p2, threadArray[p2].value);
+ threadArray[p2].value++;
+ }
+ /* Perform wait (old sem). Again without obtaining mutex as
+ we've already claimed it. */
+ if (threadArray[old].value == 0)
+ {
+ currentThread = p2;
+ /* Record we are about to wait on the condition variable. */
+ threadArray[old].waiting = true;
+ __gthread_cond_wait (&threadArray[old].run_counter, &lock);
+ threadArray[old].waiting = false;
+ /* We are running again. */
+ currentThread = old;
+ }
+ else
+ {
+ tprintf ("(currentThread = %d) no need for thread %d to cond_wait - taking value (pre) = %d\n",
+ currentThread, old, threadArray[old].value);
+ /* No need to block as we have been told a signal has
+ effectively already been recorded. We remove the signal
+ notification without blocking. */
+ threadArray[old].value--;
+ }
+ tprintf ("end, context back to %d\n", currentThread);
+ if (currentThread != old)
+ M2RTS_HaltC (__FILE__, __LINE__, __FUNCTION__,
+ "wrong process id");
+ }
+ }
+ __gthread_mutex_unlock (&lock);
}
extern "C" int
-RTco_select (int p1, fd_set *p2, fd_set *p3, fd_set *p4, const timespec *p5)
+EXPORT(select) (int p1, fd_set *p2, fd_set *p3, fd_set *p4, const timespec *p5)
{
- RTco_init ();
+ EXPORT(init) ();
tprintf ("[%x] RTco.select (...)\n", pthread_self ());
return pselect (p1, p2, p3, p4, p5, NULL);
}
extern "C" int
-RTco_init (void)
+EXPORT(init) (void)
{
+ tprintf ("checking init\n");
if (! initialized)
{
- int tid;
+ initialized = TRUE;
tprintf ("RTco initialized\n");
- initSem (&lock, 0);
+ __GTHREAD_MUTEX_INIT_FUNCTION (&lock);
+ __gthread_mutex_lock (&lock);
/* Create initial thread container. */
#if defined(POOL)
threadArray = (threadCB *)malloc (sizeof (threadCB) * THREAD_POOL);
semArray = (threadSem **)malloc (sizeof (threadSem *) * SEM_POOL);
#endif
- tid = newThread (); /* For the current initial thread. */
- threadArray[tid].tid = tid;
- threadArray[tid].execution = initSemaphore (0);
- threadArray[tid].p = pthread_self ();
- threadArray[tid].interruptLevel = 0;
- threadArray[tid].proc
- = never; /* This shouldn't happen as we are already running. */
- initialized = TRUE;
+ /* Create a thread control block for the main program (or process). */
+ currentThread = newThread (); /* For the current initial thread. */
+ threadArray[currentThread].p = pthread_self ();
+ threadArray[currentThread].tid = currentThread;
+ __GTHREAD_COND_INIT_FUNCTION (&threadArray[currentThread].run_counter);
+ threadArray[currentThread].interruptLevel = 0;
+ /* The line below shouldn't be necessary as we are already running. */
+ threadArray[currentThread].proc = never;
+ threadArray[currentThread].waiting = false; /* We are running. */
+ threadArray[currentThread].value = 0; /* No signal from anyone yet. */
tprintf ("RTco initialized completed\n");
- signalSem (&lock);
+ __gthread_mutex_unlock (&lock);
}
return 0;
}
extern "C" void __attribute__((__constructor__))
-_M2_RTco_ctor (void)
+M2EXPORT(ctor) (void)
{
- M2RTS_RegisterModule ("RTco", _M2_RTco_init, _M2_RTco_fini,
- _M2_RTco_dep);
+ M2RTS_RegisterModule ("RTco",
+ M2EXPORT(init), M2EXPORT(fini),
+ M2EXPORT(dep));
}