winpr/synch: rewrite barrier implementation & test
authorNorbert Federa <norbert.federa@thincast.com>
Fri, 3 Jun 2016 16:56:36 +0000 (18:56 +0200)
committerNorbert Federa <norbert.federa@thincast.com>
Fri, 3 Jun 2016 16:56:36 +0000 (18:56 +0200)
The synchronization barrier test as well as the actual WinPR
implementation were completely broken.

winpr/libwinpr/synch/barrier.c
winpr/libwinpr/synch/synch.h
winpr/libwinpr/synch/test/TestSynchBarrier.c

index 5aa91ba..bcb03ad 100644 (file)
@@ -3,6 +3,7 @@
  * Synchronization Functions
  *
  * Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
+ * Copyright 2016 Norbert Federa <norbert.federa@thincast.com>
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
 
 #ifdef WINPR_SYNCHRONIZATION_BARRIER
 
+#include <assert.h>
+#include <winpr/sysinfo.h>
 #include <winpr/library.h>
 #include <winpr/interlocked.h>
+#include <winpr/thread.h>
+
+/**
+ * WinPR uses the internal RTL_BARRIER struct members exactly like Windows:
+ *
+ * DWORD Reserved1:          number of threads that have not yet entered the barrier
+ * DWORD Reserved2:          number of threads required to enter the barrier
+ * ULONG_PTR Reserved3[2];   two synchronization events (manual reset events)
+ * DWORD Reserved4;          number of processors
+ * DWORD Reserved5;          spincount
+ */
 
 #ifdef _WIN32
 
@@ -75,7 +89,9 @@ static BOOL CALLBACK InitOnce_Barrier(PINIT_ONCE once, PVOID param, PVOID *conte
 
 BOOL WINAPI InitializeSynchronizationBarrier(LPSYNCHRONIZATION_BARRIER lpBarrier, LONG lTotalThreads, LONG lSpinCount)
 {
-       WINPR_BARRIER* pBarrier;
+       SYSTEM_INFO sysinfo;
+       HANDLE hEvent0;
+       HANDLE hEvent1;
 
 #ifdef _WIN32
        InitOnceExecuteOnce(&g_InitOnce, InitOnce_Barrier, NULL, NULL);
@@ -84,41 +100,43 @@ BOOL WINAPI InitializeSynchronizationBarrier(LPSYNCHRONIZATION_BARRIER lpBarrier
                return pfnInitializeSynchronizationBarrier(lpBarrier, lTotalThreads, lSpinCount);
 #endif
 
-       if (!lpBarrier)
+       if (!lpBarrier || lTotalThreads < 1 || lSpinCount < -1)
+       {
+               SetLastError(ERROR_INVALID_PARAMETER);
                return FALSE;
+       }
 
        ZeroMemory(lpBarrier, sizeof(SYNCHRONIZATION_BARRIER));
 
-       pBarrier = (WINPR_BARRIER*) calloc(1, sizeof(WINPR_BARRIER));
-
-       if (!pBarrier)
-               return FALSE;
-
-       if (lSpinCount < 0)
+       if (lSpinCount == -1)
                lSpinCount = 2000;
 
-       pBarrier->lTotalThreads = lTotalThreads;
-       pBarrier->lSpinCount = lSpinCount;
-       pBarrier->count = 0;
-
-       pBarrier->event = CreateEvent(NULL, TRUE, FALSE, NULL);
+       if (!(hEvent0 = CreateEvent(NULL, TRUE, FALSE, NULL)))
+               return FALSE;
 
-       if (!pBarrier->event)
+       if (!(hEvent1 = CreateEvent(NULL, TRUE, FALSE, NULL)))
        {
-               free(pBarrier);
+               CloseHandle(hEvent0);
                return FALSE;
        }
 
-       lpBarrier->Reserved3[0] = (ULONG_PTR) pBarrier;
+       GetNativeSystemInfo(&sysinfo);
+
+       lpBarrier->Reserved1 = lTotalThreads;
+       lpBarrier->Reserved2 = lTotalThreads;
+       lpBarrier->Reserved3[0] = (ULONG_PTR)hEvent0;
+       lpBarrier->Reserved3[1] = (ULONG_PTR)hEvent1;
+       lpBarrier->Reserved4 = sysinfo.dwNumberOfProcessors;
+       lpBarrier->Reserved5 = lSpinCount;
 
        return TRUE;
 }
 
 BOOL WINAPI EnterSynchronizationBarrier(LPSYNCHRONIZATION_BARRIER lpBarrier, DWORD dwFlags)
 {
-       LONG count;
-       BOOL status = FALSE;
-       WINPR_BARRIER* pBarrier;
+       LONG remainingThreads;
+       HANDLE hCurrentEvent;
+       HANDLE hDormantEvent;
 
 #ifdef _WIN32
        if (g_NativeBarrier)
@@ -128,51 +146,74 @@ BOOL WINAPI EnterSynchronizationBarrier(LPSYNCHRONIZATION_BARRIER lpBarrier, DWO
        if (!lpBarrier)
                return FALSE;
 
-       pBarrier = (WINPR_BARRIER*) lpBarrier->Reserved3[0];
-
-       if (!pBarrier)
+       /**
+        * dwFlags according to https://msdn.microsoft.com/en-us/library/windows/desktop/hh706889(v=vs.85).aspx
+        *
+        * SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY (0x01)
+        * Specifies that the thread entering the barrier should block
+        * immediately until the last thread enters the barrier.
+        *
+        * SYNCHRONIZATION_BARRIER_FLAGS_SPIN_ONLY (0x02)
+        * Specifies that the thread entering the barrier should spin until the
+        * last thread enters the barrier, even if the spinning thread exceeds
+        * the barrier's maximum spin count.
+        *
+        * SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE (0x04)
+        * Specifies that the function can skip the work required to ensure
+        * that it is safe to delete the barrier, which can improve
+        * performance. All threads that enter this barrier must specify the
+        * flag; otherwise, the flag is ignored. This flag should be used only
+        * if the barrier will never be deleted.
+        */
+
+       hCurrentEvent = (HANDLE)lpBarrier->Reserved3[0];
+       hDormantEvent = (HANDLE)lpBarrier->Reserved3[1];
+
+       remainingThreads = InterlockedDecrement((LONG*)&lpBarrier->Reserved1);
+
+       assert(remainingThreads >= 0);
+
+       if (remainingThreads > 0)
+       {
+               /* TODO: add spincount support */
+               WaitForSingleObject(hCurrentEvent, INFINITE);
                return FALSE;
+       }
 
-       count = InterlockedIncrement(&(pBarrier->count));
+       /* switch events */
+       lpBarrier->Reserved3[0] = (ULONG_PTR)hDormantEvent;
+       lpBarrier->Reserved3[1] = (ULONG_PTR)hCurrentEvent;
 
-       if (count < pBarrier->lTotalThreads)
-       {
-               WaitForSingleObject(pBarrier->event, INFINITE);
-       }
-       else
-       {
-               SetEvent(pBarrier->event);
-               status = TRUE;
-       }
+       /* reset the dormant event first */
+       ResetEvent(hDormantEvent);
+
+       /* reset the remaining counter */
+       lpBarrier->Reserved1 = lpBarrier->Reserved2;
 
-       InterlockedDecrement(&(pBarrier->count));
+       /* signal the blocked threads */
+       SetEvent(hCurrentEvent);
 
-       return status;
+       return TRUE;
 }
 
 BOOL WINAPI DeleteSynchronizationBarrier(LPSYNCHRONIZATION_BARRIER lpBarrier)
 {
-       WINPR_BARRIER* pBarrier;
-
 #ifdef _WIN32
        if (g_NativeBarrier)
                return pfnDeleteSynchronizationBarrier(lpBarrier);
 #endif
 
        if (!lpBarrier)
-               return TRUE;
-
-       pBarrier = (WINPR_BARRIER*) lpBarrier->Reserved3[0];
-
-       if (!pBarrier)
-               return TRUE;
+               return FALSE;
 
-       while (InterlockedCompareExchange(&pBarrier->count, 0, 0) != 0)
-               Sleep(100);
+       while (lpBarrier->Reserved1 != lpBarrier->Reserved2)
+               SwitchToThread();
 
-       CloseHandle(pBarrier->event);
+       if (lpBarrier->Reserved3[0])
+               CloseHandle((HANDLE)lpBarrier->Reserved3[0]);
 
-       free(pBarrier);
+       if (lpBarrier->Reserved3[1])
+               CloseHandle((HANDLE)lpBarrier->Reserved3[1]);
 
        ZeroMemory(lpBarrier, sizeof(SYNCHRONIZATION_BARRIER));
 
index 5181023..f8ff093 100644 (file)
@@ -144,13 +144,4 @@ struct winpr_timer_queue_timer
 
 #endif
 
-struct winpr_barrier
-{
-       DECLSPEC_ALIGN(4) LONG count;
-       LONG lTotalThreads;
-       LONG lSpinCount;
-       HANDLE event;
-};
-typedef struct winpr_barrier WINPR_BARRIER;
-
 #endif /* WINPR_SYNCH_PRIVATE_H */
index 9a7529e..4283450 100644 (file)
 #include <winpr/crt.h>
 #include <winpr/synch.h>
 #include <winpr/thread.h>
+#include <winpr/interlocked.h>
 
 #include "../synch.h"
 
-static int g_Count;
-static HANDLE g_Event;
-static CRITICAL_SECTION g_Lock;
-static SYNCHRONIZATION_BARRIER g_Barrier;
+static SYNCHRONIZATION_BARRIER gBarrier;
+static HANDLE gStartEvent = NULL;
 
-static void* test_synch_barrier_thread_func(void* arg)
-{
-       BOOL status;
-  int count;
+static LONG gThreadCount = 0;
+static LONG gTrueCount   = 0;
+static LONG gFalseCount  = 0;
+static LONG gErrorCount  = 0;
+
+#define LOOP_COUNT      200
+#define THREAD_COUNT    32
+#define MAX_SLEEP_MS    16
 
-       EnterCriticalSection(&g_Lock);
-  count = g_Count++;
-       LeaveCriticalSection(&g_Lock);
-       status = EnterSynchronizationBarrier(&g_Barrier, 0);
+#define EXPECTED_TRUE_COUNT  LOOP_COUNT
+#define EXPECTED_FALSE_COUNT (LOOP_COUNT * (THREAD_COUNT - 1))
 
-       printf("Thread #%d status: %s\n", count,
-                       status ? "TRUE" : "FALSE");
 
-       if (status)
+DWORD WINAPI test_synch_barrier_thread(LPVOID lpParam)
+{
+       BOOL status = FALSE;
+       DWORD i, tnum = InterlockedIncrement(&gThreadCount) - 1;
+
+       printf("Thread #%03u entered.\n", tnum);
+
+       /* wait for start event from main */
+       if (WaitForSingleObject(gStartEvent, INFINITE) != WAIT_OBJECT_0)
        {
-               SetEvent(g_Event);
+               InterlockedIncrement(&gErrorCount);
+               goto out;
        }
 
-       return NULL;
-}
-
-static void* barrier_deleter_thread_func(void* arg)
-{
-       /* Blocks until all threads are released from the barrier. */
-       DeleteSynchronizationBarrier(&g_Barrier);
+       printf("Thread #%03u unblocked.\n", tnum);
 
-       return NULL;
+       for (i = 0; i < LOOP_COUNT && gErrorCount == 0; i++)
+       {
+               /* simulate different execution times before the barrier */
+               Sleep(rand() % MAX_SLEEP_MS);
+               status = EnterSynchronizationBarrier(&gBarrier, 0);
+               //printf("Thread #%03u status: %s\n", tnum, status ? "TRUE" : "FALSE");
+               if (status)
+                       InterlockedIncrement(&gTrueCount);
+               else
+                       InterlockedIncrement(&gFalseCount);
+       }
+out:
+       printf("Thread #%03u leaving.\n", tnum);
+       return 0;
 }
 
+
 int TestSynchBarrier(int argc, char* argv[])
 {
-       int index;
-       HANDLE threads[5];
-       HANDLE deleter_thread = NULL;
+       HANDLE threads[THREAD_COUNT];
+       DWORD dwStatus;
+       int i;
 
-       g_Count = 0;
+       /* Test invalid parameters */
+       if (InitializeSynchronizationBarrier(&gBarrier, 0, -1))
+       {
+               printf("%s: InitializeSynchronizationBarrier unecpectedly succeeded with lTotalThreads = 0\n", __FUNCTION__);
+               return -1;
+       }
 
-       if (!(g_Event = CreateEvent(NULL, TRUE, FALSE, NULL)))
+       if (InitializeSynchronizationBarrier(&gBarrier, -1, -1))
        {
-               printf("%s: CreateEvent failed. GetLastError() = 0x%08x", __FUNCTION__, GetLastError());
+               printf("%s: InitializeSynchronizationBarrier unecpectedly succeeded with lTotalThreads = -1\n", __FUNCTION__);
                return -1;
        }
 
-       if (!InitializeCriticalSectionAndSpinCount(&g_Lock, 4000))
+       if (InitializeSynchronizationBarrier(&gBarrier, 1, -2))
        {
-               printf("%s: InitializeCriticalSectionAndSpinCount failed. GetLastError() = 0x%08x", __FUNCTION__, GetLastError());
-               CloseHandle(g_Event);
+               printf("%s: InitializeSynchronizationBarrier unecpectedly succeeded with lSpinCount = -2\n", __FUNCTION__);
                return -1;
        }
 
-       if (!InitializeSynchronizationBarrier(&g_Barrier, 5, -1))
+
+       /* Functional test */
+       if (!InitializeSynchronizationBarrier(&gBarrier, THREAD_COUNT, -1))
        {
                printf("%s: InitializeSynchronizationBarrier failed. GetLastError() = 0x%08x", __FUNCTION__, GetLastError());
-               DeleteCriticalSection(&g_Lock);
-               CloseHandle(g_Event);
+               DeleteSynchronizationBarrier(&gBarrier);
                return -1;
        }
 
-       for (index = 0; index < 5; index++)
+       if (!(gStartEvent = CreateEvent(NULL, TRUE, FALSE, NULL)))
        {
-               if (!(threads[index] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)
-                               test_synch_barrier_thread_func, NULL, 0, NULL)))
+               printf("%s: CreateEvent failed with error 0x%08x", __FUNCTION__, GetLastError());
+               DeleteSynchronizationBarrier(&gBarrier);
+               return -1;
+       }
+
+       for (i = 0; i < THREAD_COUNT; i++)
+       {
+               if (!(threads[i] = CreateThread(NULL, 0, test_synch_barrier_thread, NULL, 0, NULL)))
                {
-                       printf("%s: CreateThread failed for thread #%d. GetLastError() = 0x%08x\n", __FUNCTION__, index, GetLastError());
-                       while (index)
-                               CloseHandle(threads[--index]);
-                       CloseHandle(deleter_thread);
-                       DeleteCriticalSection(&g_Lock);
-                       CloseHandle(g_Event);
-                       return -1;
+                       printf("%s: CreateThread failed for thread #%u with error 0x%08x\n", __FUNCTION__, i, GetLastError());
+                       InterlockedIncrement(&gErrorCount);
+                       break;
                }
+       }
 
-               if (index == 0)
+       if (i > 0)
+       {
+               SetEvent(gStartEvent);
+
+               if (WAIT_OBJECT_0 != (dwStatus = WaitForMultipleObjects(i, threads, TRUE, INFINITE)))
                {
-                       /* Make sure first thread has already entered the barrier... */
-                       while (((WINPR_BARRIER*) g_Barrier.Reserved3[0])->count == 0)
-                               Sleep(100);
-
-                       /* Now spawn the deleter thread. */
-                       if (!(deleter_thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)
-                                       barrier_deleter_thread_func, NULL, 0, NULL)))
-                       {
-                               printf("%s: CreateThread failed for deleter thread. GetLastError() = 0x%08x\n", __FUNCTION__, GetLastError());
-                               while (index)
-                                       CloseHandle(threads[--index]);
-                               DeleteCriticalSection(&g_Lock);
-                               CloseHandle(g_Event);
-                               return -1;
-                       }
+                       printf("%s: WaitForMultipleObjects unexpectedly returned %u (error = 0x%08x)\n",
+                               __FUNCTION__, dwStatus, GetLastError());
+                       gErrorCount++;
                }
        }
 
-       WaitForSingleObject(g_Event, INFINITE);
+       CloseHandle(gStartEvent);
+       DeleteSynchronizationBarrier(&gBarrier);
 
-       if (g_Count != 5)
-               return -1;
+       if (gTrueCount != EXPECTED_TRUE_COUNT)
+               InterlockedIncrement(&gErrorCount);
+
+       if (gFalseCount != EXPECTED_FALSE_COUNT)
+               InterlockedIncrement(&gErrorCount);
 
-       printf("All threads have reached the barrier\n");
+       printf("%s: gErrorCount: %d (expected 0)\n", __FUNCTION__, gErrorCount);
+       printf("%s: gTrueCount:  %d (expected %d)\n", __FUNCTION__, gTrueCount, LOOP_COUNT);
+       printf("%s: gFalseCount: %d (expected %d)\n", __FUNCTION__, gFalseCount, LOOP_COUNT * (THREAD_COUNT - 1));
 
-       for (index = 0; index < 5; index++)
+       if (gErrorCount > 0)
        {
-               CloseHandle(threads[index]);
+               printf("%s: Error test failed with %d reported errors\n", __FUNCTION__, gErrorCount);
+               return -1;
        }
 
-       CloseHandle(deleter_thread);
-
-       DeleteCriticalSection(&g_Lock);
-
-       CloseHandle(g_Event);
+       printf("%s: Test successfully completed\n", __FUNCTION__);
 
        return 0;
 }