libwinpr-utils: test MessageQueue
authorMarc-André Moreau <marcandre.moreau@gmail.com>
Thu, 24 Jan 2013 20:08:49 +0000 (15:08 -0500)
committerMarc-André Moreau <marcandre.moreau@gmail.com>
Thu, 24 Jan 2013 20:08:49 +0000 (15:08 -0500)
winpr/include/winpr/collections.h
winpr/libwinpr/utils/collections/MessageQueue.c
winpr/libwinpr/utils/test/CMakeLists.txt
winpr/libwinpr/utils/test/TestMessageQueue.c [new file with mode: 0644]

index 0aa4949..4446f41 100644 (file)
@@ -263,19 +263,23 @@ struct _wMessageQueue
        int tail;
        int size;
        int capacity;
-       wMessage** array;
+       wMessage* array;
        HANDLE mutex;
        HANDLE event;
 };
 typedef struct _wMessageQueue wMessageQueue;
 
+#define WMQ_QUIT       0xFFFF
+
 WINPR_API HANDLE MessageQueue_Event(wMessageQueue* queue);
+WINPR_API BOOL MessageQueue_Wait(wMessageQueue* queue);
 
 WINPR_API void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message);
 WINPR_API void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam);
+WINPR_API void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode);
 
-WINPR_API wMessage* MessageQueue_Get(wMessageQueue* queue);
-WINPR_API wMessage* MessageQueue_Peek(wMessageQueue* queue);
+WINPR_API int MessageQueue_Get(wMessageQueue* queue, wMessage* message);
+WINPR_API int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove);
 
 WINPR_API wMessageQueue* MessageQueue_New();
 WINPR_API void MessageQueue_Free(wMessageQueue* queue);
index ebce3ae..4ab6e25 100644 (file)
@@ -47,21 +47,14 @@ HANDLE MessageQueue_Event(wMessageQueue* queue)
  * Methods
  */
 
-void MessageQueue_Clear(wMessageQueue* queue)
+BOOL MessageQueue_Wait(wMessageQueue* queue)
 {
-       int index;
+       BOOL status = FALSE;
 
-       WaitForSingleObject(queue->mutex, INFINITE);
-
-       for (index = queue->head; index != queue->tail; index = (index + 1) % queue->capacity)
-       {
-               queue->array[index] = NULL;
-       }
-
-       queue->size = 0;
-       queue->head = queue->tail = 0;
+       if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0)
+               status = TRUE;
 
-       ReleaseMutex(queue->mutex);
+       return status;
 }
 
 void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
@@ -77,17 +70,17 @@ void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
                new_capacity = queue->capacity * 2;
 
                queue->capacity = new_capacity;
-               queue->array = (wMessage**) realloc(queue->array, sizeof(wMessage*) * queue->capacity);
-               ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage*));
+               queue->array = (wMessage*) realloc(queue->array, sizeof(wMessage) * queue->capacity);
+               ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage));
 
                if (queue->tail < (old_capacity - 1))
                {
-                       CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage*));
+                       CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage));
                        queue->tail += old_capacity;
                }
        }
 
-       queue->array[queue->tail] = message;
+       CopyMemory(&(queue->array[queue->tail]), message, sizeof(wMessage));
        queue->tail = (queue->tail + 1) % queue->capacity;
        queue->size++;
 
@@ -98,33 +91,38 @@ void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
 
 void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
 {
-       wMessage* message;
+       wMessage message;
 
-       message = (wMessage*) malloc(sizeof(wMessage));
+       message.context = context;
+       message.type = type;
+       message.wParam = wParam;
+       message.lParam = lParam;
 
-       if (message)
-       {
-               message->context = context;
-               message->type = type;
-               message->wParam = wParam;
-               message->lParam = lParam;
+       MessageQueue_Dispatch(queue, &message);
+}
 
-               MessageQueue_Dispatch(queue, message);
-       }
+void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode)
+{
+       MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*) (size_t) nExitCode, NULL);
 }
 
-wMessage* MessageQueue_Get(wMessageQueue* queue)
+int MessageQueue_Get(wMessageQueue* queue, wMessage* message)
 {
-       wMessage* message = NULL;
+       int status = -1;
+
+       if (!MessageQueue_Wait(queue))
+               return status;
 
        WaitForSingleObject(queue->mutex, INFINITE);
 
        if (queue->size > 0)
        {
-               message = queue->array[queue->head];
-               queue->array[queue->head] = NULL;
+               CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
+               ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
                queue->head = (queue->head + 1) % queue->capacity;
                queue->size--;
+
+               status = (message->type != WMQ_QUIT) ? 1 : 0;
        }
 
        if (queue->size < 1)
@@ -132,21 +130,31 @@ wMessage* MessageQueue_Get(wMessageQueue* queue)
 
        ReleaseMutex(queue->mutex);
 
-       return message;
+       return status;
 }
 
-wMessage* MessageQueue_Peek(wMessageQueue* queue)
+int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
 {
-       wMessage* message = NULL;
+       int status = 0;
 
        WaitForSingleObject(queue->mutex, INFINITE);
 
        if (queue->size > 0)
-               message = queue->array[queue->head];
+       {
+               CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
+               status = 1;
+
+               if (remove)
+               {
+                       ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
+                       queue->head = (queue->head + 1) % queue->capacity;
+                       queue->size--;
+               }
+       }
 
        ReleaseMutex(queue->mutex);
 
-       return message;
+       return status;
 }
 
 /**
@@ -166,7 +174,7 @@ wMessageQueue* MessageQueue_New()
                queue->size = 0;
 
                queue->capacity = 32;
-               queue->array = (wMessage**) malloc(sizeof(wMessage*) * queue->capacity);
+               queue->array = (wMessage*) malloc(sizeof(wMessage) * queue->capacity);
 
                queue->mutex = CreateMutex(NULL, FALSE, NULL);
                queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
@@ -177,8 +185,6 @@ wMessageQueue* MessageQueue_New()
 
 void MessageQueue_Free(wMessageQueue* queue)
 {
-       MessageQueue_Clear(queue);
-
        CloseHandle(queue->event);
        CloseHandle(queue->mutex);
 
index e5d6dc0..9711773 100644 (file)
@@ -7,7 +7,8 @@ set(${MODULE_PREFIX}_DRIVER ${MODULE_NAME}.c)
 set(${MODULE_PREFIX}_TESTS
        TestQueue.c
        TestArrayList.c
-       TestCmdLine.c)
+       TestCmdLine.c
+       TestMessageQueue.c)
 
 create_test_sourcelist(${MODULE_PREFIX}_SRCS
        ${${MODULE_PREFIX}_DRIVER}
@@ -18,7 +19,7 @@ add_executable(${MODULE_NAME} ${${MODULE_PREFIX}_SRCS})
 set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS
        MONOLITHIC ${MONOLITHIC_BUILD}
        MODULE winpr
-       MODULES winpr-crt winpr-utils)
+       MODULES winpr-crt winpr-thread winpr-utils)
 
 target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS})
 
diff --git a/winpr/libwinpr/utils/test/TestMessageQueue.c b/winpr/libwinpr/utils/test/TestMessageQueue.c
new file mode 100644 (file)
index 0000000..fccf80f
--- /dev/null
@@ -0,0 +1,48 @@
+
+#include <winpr/crt.h>
+#include <winpr/thread.h>
+#include <winpr/collections.h>
+
+static void* message_queue_consumer_thread(void* arg)
+{
+       wMessage message;
+       wMessageQueue* queue;
+
+       queue = (wMessageQueue*) arg;
+
+       while (MessageQueue_Wait(queue))
+       {
+               if (MessageQueue_Peek(queue, &message, TRUE))
+               {
+                       if (message.type == WMQ_QUIT)
+                               break;
+
+                       printf("Message.Type: %d\n", message.type);
+               }
+       }
+
+       return NULL;
+}
+
+int TestMessageQueue(int argc, char* argv[])
+{
+       HANDLE thread;
+       wMessageQueue* queue;
+
+       printf("Message Queue\n");
+
+       queue = MessageQueue_New();
+
+       thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) message_queue_consumer_thread, (void*) queue, 0, NULL);
+
+       MessageQueue_Post(queue, NULL, 123, NULL, NULL);
+       MessageQueue_Post(queue, NULL, 456, NULL, NULL);
+       MessageQueue_Post(queue, NULL, 789, NULL, NULL);
+       MessageQueue_PostQuit(queue, 0);
+
+       WaitForSingleObject(thread, INFINITE);
+
+       MessageQueue_Free(queue);
+
+       return 0;
+}