#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
-#include <mqueue.h>
#include <fcntl.h>
#include <sys/types.h>
+#include <semaphore.h>
#include <unistd.h>
/*
/*
*=============================================================================
+ * Implementation of thread-safe circular queue for local use
+ *=============================================================================
+ */
+typedef struct
+{
+ pthread_mutex_t queueMutex;
+ sem_t readBlockSemaphore;
+
+ t_ilm_uint size;
+ t_ilm_uint maxSize;
+ t_ilm_uint readPos;
+ t_ilm_uint writePos;
+
+ t_ilm_message* messages;
+} t_ilm_msg_queue;
+
+
+static void init_msg_queue(t_ilm_msg_queue* pQueue, t_ilm_uint maxSize)
+{
+ pQueue->maxSize = maxSize;
+ pQueue->messages = malloc(sizeof(t_ilm_message) * maxSize);
+
+ pQueue->size = 0;
+ pQueue->readPos = 0;
+ pQueue->writePos = 0;
+
+ pthread_mutex_init(&pQueue->queueMutex, NULL);
+ sem_init(&pQueue->readBlockSemaphore, 0, 0);
+}
+
+static t_ilm_bool msg_enqueue(t_ilm_msg_queue* pQueue, t_ilm_message message)
+{
+ pthread_mutex_lock(&pQueue->queueMutex);
+
+ if (pQueue->size < pQueue->maxSize)
+ {
+ ++pQueue->size;
+ pQueue->messages[pQueue->writePos] = message;
+ pQueue->writePos = (pQueue->writePos + 1) % pQueue->maxSize;
+
+ /* wakeup a blocked dequeue reqquest */
+ sem_post(&pQueue->readBlockSemaphore);
+
+ pthread_mutex_unlock(&pQueue->queueMutex);
+ return ILM_TRUE;
+ }
+
+ pthread_mutex_unlock(&pQueue->queueMutex);
+ return ILM_FALSE;
+}
+
+static t_ilm_message msg_dequeue(t_ilm_msg_queue* pQueue)
+{
+ t_ilm_message result = NULL;
+ /* wait until a message is available */
+ sem_wait(&pQueue->readBlockSemaphore);
+
+ pthread_mutex_lock(&pQueue->queueMutex);
+
+ if (pQueue->size > 0)
+ {
+ --pQueue->size;
+ result = pQueue->messages[pQueue->readPos];
+ pQueue->readPos = (pQueue->readPos + 1) % pQueue->maxSize;
+ }
+
+ pthread_mutex_unlock(&pQueue->queueMutex);
+ return result;
+}
+
+
+static void destroy_msg_queue(t_ilm_msg_queue* pQueue)
+{
+ if (pQueue->maxSize > 0)
+ {
+ pQueue->maxSize = 0;
+ pQueue->size = 0;
+ free(pQueue->messages);
+ }
+}
+
+
+/*
+ *=============================================================================
* global vars
*=============================================================================
*/
static pthread_mutex_t gSendReceiveLock;
static pthread_t gNotificationThread;
-static mqd_t incomingMqRead;
-static mqd_t incomingMqWrite;
-
-static mqd_t notificationMqRead;
-static mqd_t notificationMqWrite;
+static const int maxQueueSize = 1024;
+static t_ilm_msg_queue incomingQueue;
+static t_ilm_msg_queue notificationQueue;
static t_ilm_bool gInitialized = ILM_FALSE;
(void)param;
- while (-1 != mq_receive(notificationMqRead, (char*)¬ification, sizeof(notification), NULL))
+ while (NULL != (notification = msg_dequeue(¬ificationQueue)))
{
t_ilm_const_string name = gIpcModule.getMessageName(notification);
switch (messageType)
{
case IpcMessageTypeNotification:
- if (-1 == mq_send(notificationMqWrite, (char*)&message, sizeof(message), 0))
+ if (ILM_FALSE == msg_enqueue(¬ificationQueue, message))
{
if (EAGAIN == errno)
{
case IpcMessageTypeCommand:
case IpcMessageTypeError:
- if (-1 == mq_send(incomingMqWrite, (char*)&message, sizeof(message), 0))
+ if (ILM_FALSE == msg_enqueue(&incomingQueue, message))
{
if (EAGAIN == errno)
{
if (gIpcModule.sendToService(command))
{
- if (-1 == mq_timedreceive(incomingMqRead, (char*)response, sizeof(t_ilm_message), NULL, &ts))
+ if (NULL == (*response = msg_dequeue(&incomingQueue)))
{
*error = ILM_ERROR_ON_CONNECTION;
}
if (gIpcModule.initClientMode())
{
- char mqName[30];
pthread_attr_t notificationThreadAttributes;
int ret;
- struct mq_attr mqAttr;
- mqAttr.mq_maxmsg = 4;
- mqAttr.mq_msgsize = sizeof(t_ilm_message);
- mqAttr.mq_flags = 0; /*O_NONBLOCK, */
- mqAttr.mq_curmsgs = 0;
-
result = ILM_SUCCESS;
- snprintf(mqName, sizeof(mqName), NOTIFICATION_QUEUE_NAME, getpid());
+ init_msg_queue(¬ificationQueue, maxQueueSize);
- notificationMqWrite = mq_open(mqName, O_WRONLY | O_CREAT, 0600, &mqAttr);
- notificationMqRead = mq_open(mqName, O_RDONLY);
- mq_unlink(mqName); /* is destroyed on closed filedescriptor */
+ init_msg_queue(&incomingQueue, maxQueueSize);
- snprintf(mqName, sizeof(mqName), INCOMING_QUEUE_NAME, getpid());
- incomingMqWrite = mq_open(mqName, O_WRONLY | O_CREAT, 0600, &mqAttr);
- incomingMqRead = mq_open(mqName, O_RDONLY);
- mq_unlink(mqName); /* is destroyed on closed filedescriptor */
-
- if ((mqd_t)-1 == notificationMqRead || (mqd_t)-1 == notificationMqWrite)
+ if (notificationQueue.maxSize == 0)
{
- printf("mq_open failed, errno = %d\n", errno);
+ printf("failed to allocate queue\n");
return result;
}
gIpcModule.destroy();
- mq_close(notificationMqRead);
- mq_close(notificationMqWrite);
-
- mq_close(incomingMqRead);
- mq_close(incomingMqWrite);
+ destroy_msg_queue(¬ificationQueue);
+ destroy_msg_queue(&incomingQueue);
gInitialized = ILM_FALSE;