libwinpr-pool: improve thread pool API on Linux
authorMarc-André Moreau <marcandre.moreau@gmail.com>
Tue, 22 Jan 2013 21:19:32 +0000 (16:19 -0500)
committerMarc-André Moreau <marcandre.moreau@gmail.com>
Tue, 22 Jan 2013 21:19:32 +0000 (16:19 -0500)
winpr/libwinpr/pool/callback_cleanup.c
winpr/libwinpr/pool/cleanup_group.c
winpr/libwinpr/pool/pool.c
winpr/libwinpr/pool/pool.h
winpr/libwinpr/pool/test/TestPoolWork.c
winpr/libwinpr/synch/wait.c
winpr/libwinpr/thread/thread.c

index 2f0b162..f0653ae 100644 (file)
@@ -24,6 +24,8 @@
 #include <winpr/crt.h>
 #include <winpr/pool.h>
 
+#include "pool.h"
+
 #ifdef _WIN32
 
 static BOOL module_initialized = FALSE;
index 807df86..bd8676d 100644 (file)
@@ -24,6 +24,8 @@
 #include <winpr/crt.h>
 #include <winpr/pool.h>
 
+#include "pool.h"
+
 #ifdef _WIN32
 
 static BOOL module_initialized = FALSE;
@@ -56,14 +58,16 @@ static void module_init()
 
 PTP_CLEANUP_GROUP CreateThreadpoolCleanupGroup()
 {
+       PTP_CLEANUP_GROUP cleanupGroup = NULL;
 #ifdef _WIN32
        module_init();
 
        if (pCreateThreadpoolCleanupGroup)
                return pCreateThreadpoolCleanupGroup();
 #else
+       cleanupGroup = (PTP_CLEANUP_GROUP) malloc(sizeof(TP_CLEANUP_GROUP));
 #endif
-       return NULL;
+       return cleanupGroup;
 }
 
 VOID CloseThreadpoolCleanupGroupMembers(PTP_CLEANUP_GROUP ptpcg, BOOL fCancelPendingCallbacks, PVOID pvCleanupContext)
@@ -74,6 +78,7 @@ VOID CloseThreadpoolCleanupGroupMembers(PTP_CLEANUP_GROUP ptpcg, BOOL fCancelPen
        if (pCloseThreadpoolCleanupGroupMembers)
                pCloseThreadpoolCleanupGroupMembers(ptpcg, fCancelPendingCallbacks, pvCleanupContext);
 #else
+
 #endif
 }
 
@@ -85,6 +90,7 @@ VOID CloseThreadpoolCleanupGroup(PTP_CLEANUP_GROUP ptpcg)
        if (pCloseThreadpoolCleanupGroup)
                pCloseThreadpoolCleanupGroup(ptpcg);
 #else
+       free(ptpcg);
 #endif
 }
 
index 9465cba..402ff67 100644 (file)
@@ -68,14 +68,27 @@ static TP_POOL DEFAULT_POOL =
 
 static void* thread_pool_work_func(void* arg)
 {
+       DWORD status;
        PTP_POOL pool;
        PTP_WORK work;
+       HANDLE events[2];
        PTP_CALLBACK_INSTANCE callbackInstance;
 
        pool = (PTP_POOL) arg;
 
-       while (WaitForSingleObject(Queue_Event(pool->PendingQueue), INFINITE) == WAIT_OBJECT_0)
+       events[0] = pool->TerminateEvent;
+       events[1] = Queue_Event(pool->PendingQueue);
+
+       while (1)
        {
+               status = WaitForMultipleObjects(2, events, FALSE, INFINITE);
+
+               if (status == WAIT_OBJECT_0)
+                       break;
+
+               if (status != (WAIT_OBJECT_0 + 1))
+                       break;
+
                callbackInstance = (PTP_CALLBACK_INSTANCE) Queue_Dequeue(pool->PendingQueue);
 
                if (callbackInstance)
@@ -90,28 +103,41 @@ static void* thread_pool_work_func(void* arg)
        return NULL;
 }
 
-PTP_POOL GetDefaultThreadpool()
+void InitializeThreadpool(PTP_POOL pool)
 {
        int index;
-       PTP_POOL pool = NULL;
-
-       pool = &DEFAULT_POOL;
+       HANDLE thread;
 
        if (!pool->Threads)
        {
-               pool->ThreadCount = 4;
-               pool->Threads = (HANDLE*) malloc(pool->ThreadCount * sizeof(HANDLE));
+               pool->Minimum = 0;
+               pool->Maximum = 500;
+
+               pool->Threads = ArrayList_New(TRUE);
 
                pool->PendingQueue = Queue_New(TRUE, -1, -1);
                pool->WorkComplete = CountdownEvent_New(0);
 
-               for (index = 0; index < pool->ThreadCount; index++)
+               pool->TerminateEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+
+               for (index = 0; index < 4; index++)
                {
-                       pool->Threads[index] = CreateThread(NULL, 0,
+                       thread = CreateThread(NULL, 0,
                                        (LPTHREAD_START_ROUTINE) thread_pool_work_func,
                                        (void*) pool, 0, NULL);
+
+                       ArrayList_Add(pool->Threads, thread);
                }
        }
+}
+
+PTP_POOL GetDefaultThreadpool()
+{
+       PTP_POOL pool = NULL;
+
+       pool = &DEFAULT_POOL;
+
+       InitializeThreadpool(pool);
 
        return pool;
 }
@@ -131,10 +157,7 @@ PTP_POOL CreateThreadpool(PVOID reserved)
        pool = (PTP_POOL) malloc(sizeof(TP_POOL));
 
        if (pool)
-       {
-               pool->Minimum = 0;
-               pool->Maximum = 500;
-       }
+               InitializeThreadpool(pool);
 #endif
 
        return pool;
@@ -148,6 +171,25 @@ VOID CloseThreadpool(PTP_POOL ptpp)
        if (pCloseThreadpool)
                pCloseThreadpool(ptpp);
 #else
+       int index;
+       HANDLE thread;
+
+       SetEvent(ptpp->TerminateEvent);
+
+       index = ArrayList_Count(ptpp->Threads) - 1;
+
+       while (index >= 0)
+       {
+               thread = (HANDLE) ArrayList_GetItem(ptpp->Threads, index);
+               WaitForSingleObject(thread, INFINITE);
+               index--;
+       }
+
+       ArrayList_Free(ptpp->Threads);
+       Queue_Free(ptpp->PendingQueue);
+       CountdownEvent_Free(ptpp->WorkComplete);
+       CloseHandle(ptpp->TerminateEvent);
+
        free(ptpp);
 #endif
 }
index b1d8267..81485f4 100644 (file)
@@ -34,9 +34,9 @@ struct _TP_POOL
 {
        DWORD Minimum;
        DWORD Maximum;
-       HANDLE* Threads;
-       DWORD ThreadCount;
+       wArrayList* Threads;
        wQueue* PendingQueue;
+       HANDLE TerminateEvent;
        wCountdownEvent* WorkComplete;
 };
 
@@ -62,6 +62,11 @@ struct _TP_IO
        void* dummy;
 };
 
+struct _TP_CLEANUP_GROUP
+{
+       void* dummy;
+};
+
 #ifndef _WIN32
 
 PTP_POOL GetDefaultThreadpool();
index 37db4d3..dbd3d61 100644 (file)
@@ -6,7 +6,23 @@ static int count = 0;
 
 void test_WorkCallback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work)
 {
-       printf("Hello %s: %d\n", context, count++);
+       int index;
+       BYTE a[1024];
+       BYTE b[1024];
+       BYTE c[1024];
+
+       printf("Hello %s: %d (thread: %d)\n", context, count++, GetCurrentThreadId());
+
+       for (index = 0; index < 100; index++)
+       {
+               ZeroMemory(a, 1024);
+               ZeroMemory(b, 1024);
+               ZeroMemory(c, 1024);
+               FillMemory(a, 1024, 0xAA);
+               FillMemory(b, 1024, 0xBB);
+               CopyMemory(c, a, 1024);
+               CopyMemory(c, b, 1024);
+       }
 }
 
 int TestPoolWork(int argc, char* argv[])
index 2d34848..ae182ba 100644 (file)
@@ -50,14 +50,19 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds)
 
        if (Type == HANDLE_TYPE_THREAD)
        {
+               int status;
                WINPR_THREAD* thread;
+               void* thread_status = NULL;
 
                if (dwMilliseconds != INFINITE)
                        printf("WaitForSingleObject: timeout not implemented for thread wait\n");
 
                thread = (WINPR_THREAD*) Object;
 
-               pthread_join(thread->thread, NULL);
+               status = pthread_join(thread->thread, &thread_status);
+
+               if (status != 0)
+                       printf("WaitForSingleObject: pthread_join failure: %d\n", status);
        }
        if (Type == HANDLE_TYPE_MUTEX)
        {
index b00a82e..7229040 100644 (file)
@@ -81,7 +81,7 @@ void winpr_StartThread(WINPR_THREAD* thread)
        pthread_attr_t attr;
 
        pthread_attr_init(&attr);
-       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
 
        if (thread->dwStackSize > 0)
                pthread_attr_setstacksize(&attr, (size_t) thread->dwStackSize);