From 309f5a76b020ef6ef91d5e65750a584f3a82ccc2 Mon Sep 17 00:00:00 2001 From: Timo Lotterbach Date: Thu, 28 Feb 2013 05:03:20 -0800 Subject: [PATCH] ilmClient: replaced POSIX message queue with custom queue implementation the use of POSIX message queues in the ilmClient library had some side effects on platforms other than Linux. Because of this, a small thread- safe queue was implemented for thread synchronization in the ilmClient. Signed-off-by: Timo Lotterbach --- .../ilmClient/src/generic_ilm_client.c | 131 +++++++++++++++------ 1 file changed, 98 insertions(+), 33 deletions(-) diff --git a/LayerManagerClient/ilmClient/src/generic_ilm_client.c b/LayerManagerClient/ilmClient/src/generic_ilm_client.c index 4ec0a3c..1f844ca 100644 --- a/LayerManagerClient/ilmClient/src/generic_ilm_client.c +++ b/LayerManagerClient/ilmClient/src/generic_ilm_client.c @@ -27,9 +27,9 @@ #include #include #include -#include #include #include +#include #include /* @@ -52,6 +52,90 @@ const int gResponseTimeout = 500; /* in ms */ /* *============================================================================= + * Implementation of thread-safe circular queue for local use + *============================================================================= + */ +typedef struct +{ + pthread_mutex_t queueMutex; + sem_t readBlockSemaphore; + + t_ilm_uint size; + t_ilm_uint maxSize; + t_ilm_uint readPos; + t_ilm_uint writePos; + + t_ilm_message* messages; +} t_ilm_msg_queue; + + +static void init_msg_queue(t_ilm_msg_queue* pQueue, t_ilm_uint maxSize) +{ + pQueue->maxSize = maxSize; + pQueue->messages = malloc(sizeof(t_ilm_message) * maxSize); + + pQueue->size = 0; + pQueue->readPos = 0; + pQueue->writePos = 0; + + pthread_mutex_init(&pQueue->queueMutex, NULL); + sem_init(&pQueue->readBlockSemaphore, 0, 0); +} + +static t_ilm_bool msg_enqueue(t_ilm_msg_queue* pQueue, t_ilm_message message) +{ + pthread_mutex_lock(&pQueue->queueMutex); + + if (pQueue->size < pQueue->maxSize) + { + ++pQueue->size; + pQueue->messages[pQueue->writePos] = message; + pQueue->writePos = (pQueue->writePos + 1) % pQueue->maxSize; + + /* wakeup a blocked dequeue reqquest */ + sem_post(&pQueue->readBlockSemaphore); + + pthread_mutex_unlock(&pQueue->queueMutex); + return ILM_TRUE; + } + + pthread_mutex_unlock(&pQueue->queueMutex); + return ILM_FALSE; +} + +static t_ilm_message msg_dequeue(t_ilm_msg_queue* pQueue) +{ + t_ilm_message result = NULL; + /* wait until a message is available */ + sem_wait(&pQueue->readBlockSemaphore); + + pthread_mutex_lock(&pQueue->queueMutex); + + if (pQueue->size > 0) + { + --pQueue->size; + result = pQueue->messages[pQueue->readPos]; + pQueue->readPos = (pQueue->readPos + 1) % pQueue->maxSize; + } + + pthread_mutex_unlock(&pQueue->queueMutex); + return result; +} + + +static void destroy_msg_queue(t_ilm_msg_queue* pQueue) +{ + if (pQueue->maxSize > 0) + { + pQueue->maxSize = 0; + pQueue->size = 0; + free(pQueue->messages); + } +} + + +/* + *============================================================================= * global vars *============================================================================= */ @@ -63,11 +147,9 @@ static pthread_t gReceiveThread; static pthread_mutex_t gSendReceiveLock; static pthread_t gNotificationThread; -static mqd_t incomingMqRead; -static mqd_t incomingMqWrite; - -static mqd_t notificationMqRead; -static mqd_t notificationMqWrite; +static const int maxQueueSize = 1024; +static t_ilm_msg_queue incomingQueue; +static t_ilm_msg_queue notificationQueue; static t_ilm_bool gInitialized = ILM_FALSE; @@ -245,7 +327,7 @@ void* notificationThreadLoop(void* param) (void)param; - while (-1 != mq_receive(notificationMqRead, (char*)¬ification, sizeof(notification), NULL)) + while (NULL != (notification = msg_dequeue(¬ificationQueue))) { t_ilm_const_string name = gIpcModule.getMessageName(notification); @@ -358,7 +440,7 @@ void* receiveThreadLoop(void* param) switch (messageType) { case IpcMessageTypeNotification: - if (-1 == mq_send(notificationMqWrite, (char*)&message, sizeof(message), 0)) + if (ILM_FALSE == msg_enqueue(¬ificationQueue, message)) { if (EAGAIN == errno) { @@ -369,7 +451,7 @@ void* receiveThreadLoop(void* param) case IpcMessageTypeCommand: case IpcMessageTypeError: - if (-1 == mq_send(incomingMqWrite, (char*)&message, sizeof(message), 0)) + if (ILM_FALSE == msg_enqueue(&incomingQueue, message)) { if (EAGAIN == errno) { @@ -422,7 +504,7 @@ t_ilm_bool sendAndWaitForResponse(t_ilm_message command, t_ilm_message* response if (gIpcModule.sendToService(command)) { - if (-1 == mq_timedreceive(incomingMqRead, (char*)response, sizeof(t_ilm_message), NULL, &ts)) + if (NULL == (*response = msg_dequeue(&incomingQueue))) { *error = ILM_ERROR_ON_CONNECTION; } @@ -480,32 +562,18 @@ ilmErrorTypes ilm_init() if (gIpcModule.initClientMode()) { - char mqName[30]; pthread_attr_t notificationThreadAttributes; int ret; - struct mq_attr mqAttr; - mqAttr.mq_maxmsg = 4; - mqAttr.mq_msgsize = sizeof(t_ilm_message); - mqAttr.mq_flags = 0; /*O_NONBLOCK, */ - mqAttr.mq_curmsgs = 0; - result = ILM_SUCCESS; - snprintf(mqName, sizeof(mqName), NOTIFICATION_QUEUE_NAME, getpid()); + init_msg_queue(¬ificationQueue, maxQueueSize); - notificationMqWrite = mq_open(mqName, O_WRONLY | O_CREAT, 0600, &mqAttr); - notificationMqRead = mq_open(mqName, O_RDONLY); - mq_unlink(mqName); /* is destroyed on closed filedescriptor */ + init_msg_queue(&incomingQueue, maxQueueSize); - snprintf(mqName, sizeof(mqName), INCOMING_QUEUE_NAME, getpid()); - incomingMqWrite = mq_open(mqName, O_WRONLY | O_CREAT, 0600, &mqAttr); - incomingMqRead = mq_open(mqName, O_RDONLY); - mq_unlink(mqName); /* is destroyed on closed filedescriptor */ - - if ((mqd_t)-1 == notificationMqRead || (mqd_t)-1 == notificationMqWrite) + if (notificationQueue.maxSize == 0) { - printf("mq_open failed, errno = %d\n", errno); + printf("failed to allocate queue\n"); return result; } @@ -594,11 +662,8 @@ ilmErrorTypes ilm_destroy() gIpcModule.destroy(); - mq_close(notificationMqRead); - mq_close(notificationMqWrite); - - mq_close(incomingMqRead); - mq_close(incomingMqWrite); + destroy_msg_queue(¬ificationQueue); + destroy_msg_queue(&incomingQueue); gInitialized = ILM_FALSE; -- 2.7.4