ilmClient: replaced POSIX message queue with custom queue implementation
authorTimo Lotterbach <timo.lotterbach@bmw-carit.de>
Thu, 28 Feb 2013 13:03:20 +0000 (05:03 -0800)
committerTimo Lotterbach <timo.lotterbach@bmw-carit.de>
Thu, 14 Mar 2013 12:03:39 +0000 (05:03 -0700)
the use of POSIX message queues in the ilmClient library had some side
effects on platforms other than Linux. Because of this, a small thread-
safe queue was implemented for thread synchronization in the ilmClient.

Signed-off-by: Timo Lotterbach <timo.lotterbach@bmw-carit.de>
LayerManagerClient/ilmClient/src/generic_ilm_client.c

index 4ec0a3c..1f844ca 100644 (file)
@@ -27,9 +27,9 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <signal.h>
-#include <mqueue.h>
 #include <fcntl.h>
 #include <sys/types.h>
+#include <semaphore.h>
 #include <unistd.h>
 
 /*
@@ -52,6 +52,90 @@ const int gResponseTimeout = 500; /* in ms */
 
 /*
  *=============================================================================
+ * Implementation of thread-safe circular queue for local use
+ *=============================================================================
+ */
+typedef struct
+{
+    pthread_mutex_t queueMutex;
+    sem_t readBlockSemaphore;
+
+    t_ilm_uint size;
+    t_ilm_uint maxSize;
+    t_ilm_uint readPos;
+    t_ilm_uint writePos;
+
+    t_ilm_message* messages;
+} t_ilm_msg_queue;
+
+
+static void init_msg_queue(t_ilm_msg_queue* pQueue, t_ilm_uint maxSize)
+{
+    pQueue->maxSize = maxSize;
+    pQueue->messages = malloc(sizeof(t_ilm_message) * maxSize);
+
+    pQueue->size = 0;
+    pQueue->readPos = 0;
+    pQueue->writePos = 0;
+
+    pthread_mutex_init(&pQueue->queueMutex, NULL);
+    sem_init(&pQueue->readBlockSemaphore, 0, 0);
+}
+
+static t_ilm_bool msg_enqueue(t_ilm_msg_queue* pQueue, t_ilm_message message)
+{
+    pthread_mutex_lock(&pQueue->queueMutex);
+
+    if (pQueue->size < pQueue->maxSize)
+    {
+        ++pQueue->size;
+        pQueue->messages[pQueue->writePos] = message;
+        pQueue->writePos = (pQueue->writePos + 1) % pQueue->maxSize;
+
+        /* wakeup a blocked dequeue reqquest */
+        sem_post(&pQueue->readBlockSemaphore);
+
+        pthread_mutex_unlock(&pQueue->queueMutex);
+        return ILM_TRUE;
+    }
+
+    pthread_mutex_unlock(&pQueue->queueMutex);
+    return ILM_FALSE;
+}
+
+static t_ilm_message msg_dequeue(t_ilm_msg_queue* pQueue)
+{
+    t_ilm_message result = NULL;
+    /* wait until a message is available */
+    sem_wait(&pQueue->readBlockSemaphore);
+
+    pthread_mutex_lock(&pQueue->queueMutex);
+
+    if (pQueue->size > 0)
+    {
+        --pQueue->size;
+        result = pQueue->messages[pQueue->readPos];
+        pQueue->readPos = (pQueue->readPos + 1) % pQueue->maxSize;
+    }
+
+    pthread_mutex_unlock(&pQueue->queueMutex);
+    return result;
+}
+
+
+static void destroy_msg_queue(t_ilm_msg_queue* pQueue)
+{
+    if (pQueue->maxSize > 0)
+    {
+        pQueue->maxSize = 0;
+        pQueue->size = 0;
+        free(pQueue->messages);
+    }
+}
+
+
+/*
+ *=============================================================================
  * global vars
  *=============================================================================
  */
@@ -63,11 +147,9 @@ static pthread_t gReceiveThread;
 static pthread_mutex_t gSendReceiveLock;
 static pthread_t gNotificationThread;
 
-static mqd_t incomingMqRead;
-static mqd_t incomingMqWrite;
-
-static mqd_t notificationMqRead;
-static mqd_t notificationMqWrite;
+static const int maxQueueSize = 1024;
+static t_ilm_msg_queue incomingQueue;
+static t_ilm_msg_queue notificationQueue;
 
 static t_ilm_bool gInitialized = ILM_FALSE;
 
@@ -245,7 +327,7 @@ void* notificationThreadLoop(void* param)
 
     (void)param;
 
-    while (-1 != mq_receive(notificationMqRead, (char*)&notification, sizeof(notification), NULL))
+    while (NULL != (notification = msg_dequeue(&notificationQueue)))
     {
         t_ilm_const_string name = gIpcModule.getMessageName(notification);
 
@@ -358,7 +440,7 @@ void* receiveThreadLoop(void* param)
         switch (messageType)
         {
             case IpcMessageTypeNotification:
-                if (-1 == mq_send(notificationMqWrite, (char*)&message, sizeof(message), 0))
+                if (ILM_FALSE == msg_enqueue(&notificationQueue, message))
                 {
                     if (EAGAIN == errno)
                     {
@@ -369,7 +451,7 @@ void* receiveThreadLoop(void* param)
 
             case IpcMessageTypeCommand:
             case IpcMessageTypeError:
-                if (-1 == mq_send(incomingMqWrite, (char*)&message, sizeof(message), 0))
+                if (ILM_FALSE == msg_enqueue(&incomingQueue, message))
                 {
                     if (EAGAIN == errno)
                     {
@@ -422,7 +504,7 @@ t_ilm_bool sendAndWaitForResponse(t_ilm_message command, t_ilm_message* response
 
     if (gIpcModule.sendToService(command))
     {
-        if (-1 == mq_timedreceive(incomingMqRead, (char*)response, sizeof(t_ilm_message), NULL, &ts))
+        if (NULL == (*response = msg_dequeue(&incomingQueue)))
         {
             *error = ILM_ERROR_ON_CONNECTION;
         }
@@ -480,32 +562,18 @@ ilmErrorTypes ilm_init()
 
         if (gIpcModule.initClientMode())
         {
-            char mqName[30];
             pthread_attr_t notificationThreadAttributes;
             int ret;
 
-            struct mq_attr mqAttr;
-            mqAttr.mq_maxmsg = 4;
-            mqAttr.mq_msgsize = sizeof(t_ilm_message);
-            mqAttr.mq_flags = 0; /*O_NONBLOCK, */
-            mqAttr.mq_curmsgs = 0;
-
             result = ILM_SUCCESS;
 
-            snprintf(mqName, sizeof(mqName), NOTIFICATION_QUEUE_NAME, getpid());
+            init_msg_queue(&notificationQueue, maxQueueSize);
 
-            notificationMqWrite = mq_open(mqName, O_WRONLY | O_CREAT, 0600, &mqAttr);
-            notificationMqRead = mq_open(mqName, O_RDONLY);
-            mq_unlink(mqName); /* is destroyed on closed filedescriptor */
+            init_msg_queue(&incomingQueue, maxQueueSize);
 
-            snprintf(mqName, sizeof(mqName), INCOMING_QUEUE_NAME, getpid());
-            incomingMqWrite = mq_open(mqName, O_WRONLY | O_CREAT, 0600, &mqAttr);
-            incomingMqRead = mq_open(mqName, O_RDONLY);
-            mq_unlink(mqName); /* is destroyed on closed filedescriptor */
-
-            if ((mqd_t)-1 == notificationMqRead || (mqd_t)-1 == notificationMqWrite)
+            if (notificationQueue.maxSize == 0)
             {
-                printf("mq_open failed, errno = %d\n", errno);
+                printf("failed to allocate queue\n");
                 return result;
             }
 
@@ -594,11 +662,8 @@ ilmErrorTypes ilm_destroy()
 
     gIpcModule.destroy();
 
-    mq_close(notificationMqRead);
-    mq_close(notificationMqWrite);
-
-    mq_close(incomingMqRead);
-    mq_close(incomingMqWrite);
+    destroy_msg_queue(&notificationQueue);
+    destroy_msg_queue(&incomingQueue);
 
     gInitialized = ILM_FALSE;