#include <assert.h>
#include <stdlib.h>
-#if (defined(_WIN32) || defined(_WIN64))
-# include "win_pthread.h"
-# include "win_semaphore.h"
-#else
-# include <pthread.h>
-# ifndef __APPLE__
-# include <semaphore.h>
-# endif
-#endif
-
#include "XLinkDispatcher.h"
#include "XLinkMacros.h"
#include "XLinkPrivateDefines.h"
xLinkEvent_t *retEv;
xLinkEventState_t isServed;
xLinkEventOrigin_t origin;
- sem_t* sem;
+ XLink_sem_t* sem;
void* data;
} xLinkEventPriv_t;
typedef struct {
- sem_t sem;
+ XLink_sem_t sem;
pthread_t threadId;
- int refs;
} localSem_t;
typedef struct{
int queueProcPriority;
- sem_t addEventSem;
- sem_t notifyDispatcherSem;
+ XLink_sem_t addEventSem;
+ XLink_sem_t notifyDispatcherSem;
volatile uint32_t resetXLink;
uint32_t semaphores;
pthread_t xLinkThreadId;
//below workaround for "C2088 '==': illegal for struct" error
static int pthread_t_compare(pthread_t a, pthread_t b);
-static sem_t* createSem(xLinkSchedulerState_t* curr);
-static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref);
-static int unrefSem(sem_t* sem, xLinkSchedulerState_t* curr);
+static XLink_sem_t* createSem(xLinkSchedulerState_t* curr);
+static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr);
#if (defined(_WIN32) || defined(_WIN64))
static void* __cdecl eventReader(void* ctx);
static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q );
static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
eventQueueHandler_t *q, xLinkEvent_t* event,
- sem_t* sem, xLinkEventOrigin_t o);
+ XLink_sem_t* sem, xLinkEventOrigin_t o);
static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr);
schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED;
}
- if (sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
+ if (XLink_sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
perror("Can't create semaphore\n");
return -1;
}
- if (sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
+ if (XLink_sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
perror("Can't create semaphore\n");
}
localSem_t* temp = schedulerState[idx].eventSemaphores;
while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) {
- temp->refs = -1;
+ XLink_sem_set_refs(&temp->sem, -1);
temp++;
}
if (pthread_attr_init(&attr) != 0) {
return NULL;
}
mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
- if (sem_wait(&curr->addEventSem)) {
+ if (XLink_sem_wait(&curr->addEventSem)) {
mvLog(MVLOG_ERROR,"can't wait semaphore\n");
return NULL;
}
- sem_t *sem = NULL;
+ XLink_sem_t *sem = NULL;
xLinkEvent_t* ev;
if (origin == EVENT_LOCAL) {
event->header.id = createUniqueID();
- sem = getAndRefSem(pthread_self(), curr, 1);
+ sem = getSem(pthread_self(), curr);
if (!sem) {
sem = createSem(curr);
}
if (!sem) {
mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n");
- if (sem_post(&curr->addEventSem)) {
+ if (XLink_sem_post(&curr->addEventSem)) {
mvLog(MVLOG_ERROR,"can't post semaphore\n");
}
} else {
ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
}
- if (sem_post(&curr->addEventSem)) {
+ if (XLink_sem_post(&curr->addEventSem)) {
mvLog(MVLOG_ERROR,"can't post semaphore\n");
}
- if (sem_post(&curr->notifyDispatcherSem)) {
+ if (XLink_sem_post(&curr->notifyDispatcherSem)) {
mvLog(MVLOG_ERROR, "can't post semaphore\n");
}
return ev;
xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
ASSERT_XLINK(curr != NULL);
- sem_t* id = getAndRefSem(pthread_self(), curr, 0);
+ XLink_sem_t* id = getSem(pthread_self(), curr);
if (id == NULL) {
return -1;
}
- int rc = sem_wait(id);
+ int rc = XLink_sem_wait(id);
#ifdef __PC__
if (rc) {
xLinkEvent_t event = {0};
event.deviceHandle = *deviceHandle;
mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
DispatcherAddEvent(EVENT_LOCAL, &event);
- id = getAndRefSem(pthread_self(), curr, 0);
- if (id == NULL || sem_wait(id)) {
+ id = getSem(pthread_self(), curr);
+ if (id == NULL || XLink_sem_wait(id)) {
dispatcherReset(curr);
}
}
#endif
- if ((XLinkError_t)unrefSem(id, curr) == X_LINK_ERROR) {
- mvLog(MVLOG_WARN, "Failed to unref sem");
- }
-
return rc;
}
(int)blockedEvent->packet.header.id,
TypeToStr((int)blockedEvent->packet.header.type));
blockedEvent->isServed = EVENT_READY;
- if (sem_post(&curr->notifyDispatcherSem)){
+ if (XLink_sem_post(&curr->notifyDispatcherSem)){
mvLog(MVLOG_ERROR, "can't post semaphore\n");
}
return 1;
#endif
}
-static sem_t* createSem(xLinkSchedulerState_t* curr)
+static XLink_sem_t* createSem(xLinkSchedulerState_t* curr)
{
XLINK_RET_ERR_IF(curr == NULL, NULL);
- sem_t* sem = getAndRefSem(pthread_self(), curr, 0);
+ XLink_sem_t* sem = getSem(pthread_self(), curr);
if (sem) {// it already exists, error
return NULL;
}
localSem_t* temp = curr->eventSemaphores;
while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
- if (temp->refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) {
- if (curr->semaphores == MAXIMUM_SEMAPHORES && !temp->refs) {
- XLINK_RET_ERR_IF(sem_destroy(&temp->sem) == -1, NULL);
+ int refs = 0;
+ XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
+ if (refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) {
+ if (curr->semaphores == MAXIMUM_SEMAPHORES && refs == 0) {
+ XLINK_RET_ERR_IF(XLink_sem_destroy(&temp->sem), NULL);
+ XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
curr->semaphores --;
- temp->refs = -1;
#if (defined(_WIN32) || defined(_WIN64))
memset(&temp->threadId, 0, sizeof(temp->threadId));
#else
#endif
}
- if (temp->refs == -1) {
+ if (refs == -1) {
sem = &temp->sem;
- if (sem_init(sem, 0, 0)){
+ if (XLink_sem_init(sem, 0, 0)){
mvLog(MVLOG_ERROR, "Error: Can't create semaphore\n");
return NULL;
}
curr->semaphores++;
- temp->refs = 1;
temp->threadId = pthread_self();
break;
}
return sem;
}
-static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref)
+static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr)
{
XLINK_RET_ERR_IF(curr == NULL, NULL);
- localSem_t* sem = curr->eventSemaphores;
- while (sem < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
- if (pthread_t_compare(sem->threadId, threadId) && sem->refs >= 0) {
- sem->refs += inc_ref;
- return &sem->sem;
- }
- sem++;
- }
- return NULL;
-}
-
-static int unrefSem(sem_t* sem, xLinkSchedulerState_t* curr) {
- ASSERT_XLINK(curr != NULL);
localSem_t* temp = curr->eventSemaphores;
while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
- if (&temp->sem == sem) {
- temp->refs--;
- return 1;
+ int refs = 0;
+ XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
+ if (pthread_t_compare(temp->threadId, threadId) && refs >= 0) {
+ return &temp->sem;
}
temp++;
}
- mvLog(MVLOG_WARN,"unrefSem : sem wasn't found\n");
- return 0;
+ return NULL;
}
#if (defined(_WIN32) || defined(_WIN64))
*(event->retEv) = event->packet;
}
if(event->sem){
- if (sem_post(event->sem)) {
+ if (XLink_sem_post(event->sem)) {
mvLog(MVLOG_ERROR,"can't post semaphore\n");
}
}
*/
static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
eventQueueHandler_t *q, xLinkEvent_t* event,
- sem_t* sem, xLinkEventOrigin_t o){
+ XLink_sem_t* sem, xLinkEventOrigin_t o){
xLinkEvent_t* ev;
xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
if (eventP == NULL) {
{
XLINK_RET_ERR_IF(curr == NULL, NULL);
- if (sem_wait(&curr->notifyDispatcherSem)) {
+ if (XLink_sem_wait(&curr->notifyDispatcherSem)) {
mvLog(MVLOG_ERROR,"can't post semaphore\n");
}
mvLog(MVLOG_INFO, "Start Clean Dispatcher...");
- if (sem_post(&curr->notifyDispatcherSem)) {
+ if (XLink_sem_post(&curr->notifyDispatcherSem)) {
mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event
}
xLinkEventPriv_t* event = dispatcherGetNextEvent(curr);
curr->schedulerId = -1;
curr->resetXLink = 1;
- sem_destroy(&curr->addEventSem);
- sem_destroy(&curr->notifyDispatcherSem);
+ XLink_sem_destroy(&curr->addEventSem);
+ XLink_sem_destroy(&curr->notifyDispatcherSem);
localSem_t* temp = curr->eventSemaphores;
while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
// unblock potentially blocked event semaphores
- sem_post(&temp->sem);
- sem_destroy(&temp->sem);
- temp->refs = -1;
+ XLink_sem_post(&temp->sem);
+ XLink_sem_destroy(&temp->sem);
temp++;
}
numSchedulers--;
}
xLinkDesc_t* link = getLink(curr->deviceHandle.xLinkFD);
- if(link == NULL || sem_post(&link->dispatcherClosedSem)) {
+ if(link == NULL || XLink_sem_post(&link->dispatcherClosedSem)) {
mvLog(MVLOG_DEBUG,"can't post dispatcherClosedSem\n");
}
--- /dev/null
+// Copyright (C) 2020 Intel Corporation
+// SPDX-License-Identifier: Apache-2.0
+//
+
+#include "XLinkSemaphore.h"
+#include "XLinkErrorUtils.h"
+#include "XLinkLog.h"
+
+static pthread_mutex_t ref_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t ref_cond = PTHREAD_COND_INITIALIZER;
+
+int XLink_sem_inc(XLink_sem_t* sem)
+{
+ XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+ if (sem->refs < 0) {
+ // Semaphore has been already destroyed
+ XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+ return -1;
+ }
+
+ sem->refs++;
+ XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+ return 0;
+}
+
+int XLink_sem_dec(XLink_sem_t* sem)
+{
+ XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+ if (sem->refs < 1) {
+ // Can't decrement reference count if there are no waiters
+ // or semaphore has been already destroyed
+ XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+ return -1;
+ }
+
+ sem->refs--;
+ int ret = pthread_cond_broadcast(&ref_cond);
+ XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+ return ret;
+}
+
+
+int XLink_sem_init(XLink_sem_t* sem, int pshared, unsigned int value)
+{
+ XLINK_RET_ERR_IF(sem == NULL, -1);
+
+ XLINK_RET_IF_FAIL(sem_init(&sem->psem, pshared, value));
+ XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+ sem->refs = 0;
+ XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+ return 0;
+}
+
+int XLink_sem_destroy(XLink_sem_t* sem)
+{
+ XLINK_RET_ERR_IF(sem == NULL, -1);
+
+ XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+ if (sem->refs < 0) {
+ // Semaphore has been already destroyed
+ XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+ return -1;
+ }
+
+ while(sem->refs > 0) {
+ if (pthread_cond_wait(&ref_cond, &ref_mutex)) {
+ break;
+ };
+ }
+ sem->refs = -1;
+ int ret = sem_destroy(&sem->psem);
+ XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+ return ret;
+}
+
+int XLink_sem_post(XLink_sem_t* sem)
+{
+ XLINK_RET_ERR_IF(sem == NULL, -1);
+ if (sem->refs < 0) {
+ return -1;
+ }
+
+ return sem_post(&sem->psem);
+}
+
+int XLink_sem_wait(XLink_sem_t* sem)
+{
+ XLINK_RET_ERR_IF(sem == NULL, -1);
+
+ XLINK_RET_IF_FAIL(XLink_sem_inc(sem));
+ int ret = sem_wait(&sem->psem);
+ XLINK_RET_IF_FAIL(XLink_sem_dec(sem));
+
+ return ret;
+}
+
+int XLink_sem_timedwait(XLink_sem_t* sem, const struct timespec* abstime)
+{
+ XLINK_RET_ERR_IF(sem == NULL, -1);
+ XLINK_RET_ERR_IF(abstime == NULL, -1);
+
+ XLINK_RET_IF_FAIL(XLink_sem_inc(sem));
+ int ret = sem_timedwait(&sem->psem, abstime);
+ XLINK_RET_IF_FAIL(XLink_sem_dec(sem));
+
+ return ret;
+}
+
+int XLink_sem_set_refs(XLink_sem_t* sem, int refs)
+{
+ XLINK_RET_ERR_IF(sem == NULL, -1);
+ XLINK_RET_ERR_IF(refs < -1, -1);
+
+ XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+ sem->refs = refs;
+ int ret = pthread_cond_broadcast(&ref_cond);
+ XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+ return ret;
+}
+
+int XLink_sem_get_refs(XLink_sem_t* sem, int *sval)
+{
+ XLINK_RET_ERR_IF(sem == NULL, -1);
+
+ *sval = sem->refs;
+ return 0;
+}