int tail;
int size;
int capacity;
- wMessage** array;
+ wMessage* array;
HANDLE mutex;
HANDLE event;
};
typedef struct _wMessageQueue wMessageQueue;
+#define WMQ_QUIT 0xFFFF
+
WINPR_API HANDLE MessageQueue_Event(wMessageQueue* queue);
+WINPR_API BOOL MessageQueue_Wait(wMessageQueue* queue);
WINPR_API void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message);
WINPR_API void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam);
+WINPR_API void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode);
-WINPR_API wMessage* MessageQueue_Get(wMessageQueue* queue);
-WINPR_API wMessage* MessageQueue_Peek(wMessageQueue* queue);
+WINPR_API int MessageQueue_Get(wMessageQueue* queue, wMessage* message);
+WINPR_API int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove);
WINPR_API wMessageQueue* MessageQueue_New();
WINPR_API void MessageQueue_Free(wMessageQueue* queue);
* Methods
*/
-void MessageQueue_Clear(wMessageQueue* queue)
+BOOL MessageQueue_Wait(wMessageQueue* queue)
{
- int index;
+ BOOL status = FALSE;
- WaitForSingleObject(queue->mutex, INFINITE);
-
- for (index = queue->head; index != queue->tail; index = (index + 1) % queue->capacity)
- {
- queue->array[index] = NULL;
- }
-
- queue->size = 0;
- queue->head = queue->tail = 0;
+ if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0)
+ status = TRUE;
- ReleaseMutex(queue->mutex);
+ return status;
}
void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
new_capacity = queue->capacity * 2;
queue->capacity = new_capacity;
- queue->array = (wMessage**) realloc(queue->array, sizeof(wMessage*) * queue->capacity);
- ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage*));
+ queue->array = (wMessage*) realloc(queue->array, sizeof(wMessage) * queue->capacity);
+ ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage));
if (queue->tail < (old_capacity - 1))
{
- CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage*));
+ CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage));
queue->tail += old_capacity;
}
}
- queue->array[queue->tail] = message;
+ CopyMemory(&(queue->array[queue->tail]), message, sizeof(wMessage));
queue->tail = (queue->tail + 1) % queue->capacity;
queue->size++;
void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
{
- wMessage* message;
+ wMessage message;
- message = (wMessage*) malloc(sizeof(wMessage));
+ message.context = context;
+ message.type = type;
+ message.wParam = wParam;
+ message.lParam = lParam;
- if (message)
- {
- message->context = context;
- message->type = type;
- message->wParam = wParam;
- message->lParam = lParam;
+ MessageQueue_Dispatch(queue, &message);
+}
- MessageQueue_Dispatch(queue, message);
- }
+void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode)
+{
+ MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*) (size_t) nExitCode, NULL);
}
-wMessage* MessageQueue_Get(wMessageQueue* queue)
+int MessageQueue_Get(wMessageQueue* queue, wMessage* message)
{
- wMessage* message = NULL;
+ int status = -1;
+
+ if (!MessageQueue_Wait(queue))
+ return status;
WaitForSingleObject(queue->mutex, INFINITE);
if (queue->size > 0)
{
- message = queue->array[queue->head];
- queue->array[queue->head] = NULL;
+ CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
+ ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
queue->head = (queue->head + 1) % queue->capacity;
queue->size--;
+
+ status = (message->type != WMQ_QUIT) ? 1 : 0;
}
if (queue->size < 1)
ReleaseMutex(queue->mutex);
- return message;
+ return status;
}
-wMessage* MessageQueue_Peek(wMessageQueue* queue)
+int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
{
- wMessage* message = NULL;
+ int status = 0;
WaitForSingleObject(queue->mutex, INFINITE);
if (queue->size > 0)
- message = queue->array[queue->head];
+ {
+ CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
+ status = 1;
+
+ if (remove)
+ {
+ ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
+ queue->head = (queue->head + 1) % queue->capacity;
+ queue->size--;
+ }
+ }
ReleaseMutex(queue->mutex);
- return message;
+ return status;
}
/**
queue->size = 0;
queue->capacity = 32;
- queue->array = (wMessage**) malloc(sizeof(wMessage*) * queue->capacity);
+ queue->array = (wMessage*) malloc(sizeof(wMessage) * queue->capacity);
queue->mutex = CreateMutex(NULL, FALSE, NULL);
queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
void MessageQueue_Free(wMessageQueue* queue)
{
- MessageQueue_Clear(queue);
-
CloseHandle(queue->event);
CloseHandle(queue->mutex);
--- /dev/null
+
+#include <winpr/crt.h>
+#include <winpr/thread.h>
+#include <winpr/collections.h>
+
+static void* message_queue_consumer_thread(void* arg)
+{
+ wMessage message;
+ wMessageQueue* queue;
+
+ queue = (wMessageQueue*) arg;
+
+ while (MessageQueue_Wait(queue))
+ {
+ if (MessageQueue_Peek(queue, &message, TRUE))
+ {
+ if (message.type == WMQ_QUIT)
+ break;
+
+ printf("Message.Type: %d\n", message.type);
+ }
+ }
+
+ return NULL;
+}
+
+int TestMessageQueue(int argc, char* argv[])
+{
+ HANDLE thread;
+ wMessageQueue* queue;
+
+ printf("Message Queue\n");
+
+ queue = MessageQueue_New();
+
+ thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) message_queue_consumer_thread, (void*) queue, 0, NULL);
+
+ MessageQueue_Post(queue, NULL, 123, NULL, NULL);
+ MessageQueue_Post(queue, NULL, 456, NULL, NULL);
+ MessageQueue_Post(queue, NULL, 789, NULL, NULL);
+ MessageQueue_PostQuit(queue, 0);
+
+ WaitForSingleObject(thread, INFINITE);
+
+ MessageQueue_Free(queue);
+
+ return 0;
+}