[IE][VPU][XLink]: XLink semaphore wrappers impl (#3079)
authorMaksim Doronin <maksim.doronin@intel.com>
Sun, 15 Nov 2020 23:51:46 +0000 (02:51 +0300)
committerGitHub <noreply@github.com>
Sun, 15 Nov 2020 23:51:46 +0000 (02:51 +0300)
XLink wrappers for POSIX semaphore functions (refer sem_overview for details). In the description of standard sem_destroy the following is noted:
"Destroying a semaphore that other processes or threads are currently blocked on (in sem_wait(3)) produces undefined behavior."
XLink wrappers use thread-safe reference count and destroy the semaphore only in case if there are no waiters.

* XLink semaphore wrapper impl
* Extend XLink win_synchapi

inference-engine/thirdparty/movidius/XLink/pc/Win/include/win_synchapi.h
inference-engine/thirdparty/movidius/XLink/pc/Win/src/win_synchapi.c
inference-engine/thirdparty/movidius/XLink/shared/include/XLinkPrivateDefines.h
inference-engine/thirdparty/movidius/XLink/shared/include/XLinkSemaphore.h [new file with mode: 0644]
inference-engine/thirdparty/movidius/XLink/shared/include/XLinkStream.h
inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDevice.c
inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcher.c
inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcherImpl.c
inference-engine/thirdparty/movidius/XLink/shared/src/XLinkPrivateFields.c
inference-engine/thirdparty/movidius/XLink/shared/src/XLinkSemaphore.c [new file with mode: 0644]
inference-engine/thirdparty/movidius/XLink/shared/src/XLinkStream.c

index 32584a3..e773dba 100644 (file)
@@ -12,6 +12,8 @@
 extern "C" {
 #endif
 
+#define PTHREAD_COND_INITIALIZER {0}
+
 typedef struct _pthread_condattr_t pthread_condattr_t;
 
 typedef struct
@@ -23,9 +25,13 @@ pthread_cond_t;
 int pthread_cond_init(pthread_cond_t* __cond, const pthread_condattr_t* __cond_attr);
 int pthread_cond_destroy(pthread_cond_t* __cond);
 
+int pthread_cond_wait(pthread_cond_t *__cond,
+    pthread_mutex_t *__mutex);
 int pthread_cond_timedwait(pthread_cond_t* __cond,
     pthread_mutex_t* __mutex,
     const struct timespec* __abstime);
+
+int pthread_cond_signal(pthread_cond_t* __cond);
 int pthread_cond_broadcast(pthread_cond_t* __cond);
 
 #ifdef __cplusplus
index a9e09c4..974b36f 100644 (file)
@@ -21,9 +21,17 @@ int pthread_cond_destroy(pthread_cond_t* __cond)
     return 0;
 }
 
+int pthread_cond_wait(pthread_cond_t *__cond,
+    pthread_mutex_t *__mutex)
+{
+    if (__cond == NULL || __mutex == NULL)
+        return ERROR_INVALID_HANDLE;
+    return pthread_cond_timedwait(__cond, __mutex, NULL);
+}
+
 int pthread_cond_timedwait(pthread_cond_t* __cond,
     pthread_mutex_t* __mutex,
-    const struct timespec* __abstime) 
+    const struct timespec* __abstime)
 {
     if (__cond == NULL) {
         return ERROR_INVALID_HANDLE;
@@ -42,7 +50,7 @@ int pthread_cond_timedwait(pthread_cond_t* __cond,
     return rc == ERROR_TIMEOUT ? ETIMEDOUT : rc;
 }
 
-int pthread_cond_broadcast(pthread_cond_t *__cond)
+int pthread_cond_signal(pthread_cond_t *__cond)
 {
     if (__cond == NULL) {
         return ERROR_INVALID_HANDLE;
@@ -51,3 +59,13 @@ int pthread_cond_broadcast(pthread_cond_t *__cond)
     WakeConditionVariable(&__cond->_cv);
     return 0;
 }
+
+int pthread_cond_broadcast(pthread_cond_t *__cond)
+{
+    if (__cond == NULL) {
+        return ERROR_INVALID_HANDLE;
+    }
+
+    WakeAllConditionVariable(&__cond->_cv);
+    return 0;
+}
index 840653e..ffeea35 100644 (file)
@@ -57,7 +57,7 @@ typedef struct xLinkDesc_t {
     xLinkState_t peerState;
     xLinkDeviceHandle_t deviceHandle;
     linkId_t id;
-    sem_t dispatcherClosedSem;
+    XLink_sem_t dispatcherClosedSem;
 
     //Deprecated fields. Begin.
     int hostClosedFD;
diff --git a/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkSemaphore.h b/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkSemaphore.h
new file mode 100644 (file)
index 0000000..8bf8592
--- /dev/null
@@ -0,0 +1,74 @@
+// Copyright (C) 2020 Intel Corporation
+// SPDX-License-Identifier: Apache-2.0
+//
+
+///
+/// @file
+///
+/// @brief     Application configuration Leon header
+///
+
+#ifndef _XLINKSEMAPHORE_H
+#define _XLINKSEMAPHORE_H
+
+# if (defined(_WIN32) || defined(_WIN64))
+#  include "win_pthread.h"
+#  include "win_semaphore.h"
+#  include "win_synchapi.h"
+# else
+#  include <pthread.h>
+#  ifdef __APPLE__
+#   include "pthread_semaphore.h"
+#  else
+#   include <semaphore.h>
+# endif
+# endif
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+//
+// This structure describes the semaphore used in XLink and
+// extends the standard semaphore with a reference count.
+// The counter is thread-safe and changes only in cases if
+// all tools of thread synchronization are really unlocked.
+// refs == -1 in case if semaphore was destroyed;
+// refs == 0 in case if semaphore was initialized but has no waiters;
+// refs == N in case if there are N waiters which called sem_wait().
+//
+
+typedef struct {
+    sem_t psem;
+    int refs;
+} XLink_sem_t;
+
+//
+// XLink wrappers for POSIX semaphore functions (refer sem_overview for details)
+// In description of standard sem_destroy the following can be noted:
+// "Destroying a semaphore that other processes or threads are currently
+// blocked on (in sem_wait(3)) produces undefined behavior."
+// XLink wrappers use thread-safe reference count and destroy the semaphore only in case
+// if there are no waiters
+//
+
+int XLink_sem_init(XLink_sem_t* sem, int pshared, unsigned int value);
+int XLink_sem_destroy(XLink_sem_t* sem);
+int XLink_sem_post(XLink_sem_t* sem);
+int XLink_sem_wait(XLink_sem_t* sem);
+int XLink_sem_timedwait(XLink_sem_t* sem, const struct timespec* abstime);
+
+//
+// Helper functions for XLink semaphore wrappers.
+// Use them only in case if you know what you are doing.
+//
+
+int XLink_sem_set_refs(XLink_sem_t* sem, int refs);
+int XLink_sem_get_refs(XLink_sem_t* sem, int *sval);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif  // _XLINKSEMAPHORE_H
index 97b65af..88fac44 100644 (file)
@@ -6,16 +6,7 @@
 #define _XLINKSTREAM_H
 
 #include "XLinkPublicDefines.h"
-
-# if (defined(_WIN32) || defined(_WIN64))
-#  include "win_semaphore.h"
-# else
-#  ifdef __APPLE__
-#   include "pthread_semaphore.h"
-#  else
-#   include <semaphore.h>
-# endif
-# endif
+#include "XLinkSemaphore.h"
 
 /**
  * @brief Streams opened to device
@@ -40,7 +31,7 @@ typedef struct{
 
     uint32_t closeStreamInitiated;
 
-    sem_t sem;
+    XLink_sem_t sem;
 }streamDesc_t;
 
 XLinkError_t XLinkStreamInitialize(
index 019a2d6..533aaf7 100644 (file)
@@ -253,7 +253,7 @@ XLinkError_t XLinkResetRemote(linkId_t id)
     XLINK_RET_ERR_IF(DispatcherWaitEventComplete(&link->deviceHandle),
         X_LINK_TIMEOUT);
 
-    if(sem_wait(&link->dispatcherClosedSem)) {
+    if(XLink_sem_wait(&link->dispatcherClosedSem)) {
         mvLog(MVLOG_ERROR,"can't wait dispatcherClosedSem\n");
         return X_LINK_ERROR;
     }
@@ -390,7 +390,7 @@ static xLinkDesc_t* getNextAvailableLink() {
 
     xLinkDesc_t* link = &availableXLinks[i];
 
-    if (sem_init(&link->dispatcherClosedSem, 0 ,0)) {
+    if (XLink_sem_init(&link->dispatcherClosedSem, 0 ,0)) {
         mvLog(MVLOG_ERROR, "Cannot initialize semaphore\n");
         return NULL;
     }
index fc0a660..8ef396f 100644 (file)
 #include <assert.h>
 #include <stdlib.h>
 
-#if (defined(_WIN32) || defined(_WIN64))
-# include "win_pthread.h"
-# include "win_semaphore.h"
-#else
-# include <pthread.h>
-# ifndef __APPLE__
-#  include <semaphore.h>
-# endif
-#endif
-
 #include "XLinkDispatcher.h"
 #include "XLinkMacros.h"
 #include "XLinkPrivateDefines.h"
@@ -55,14 +45,13 @@ typedef struct xLinkEventPriv_t {
     xLinkEvent_t *retEv;
     xLinkEventState_t isServed;
     xLinkEventOrigin_t origin;
-    sem_t* sem;
+    XLink_sem_t* sem;
     void* data;
 } xLinkEventPriv_t;
 
 typedef struct {
-    sem_t sem;
+    XLink_sem_t sem;
     pthread_t threadId;
-    int refs;
 } localSem_t;
 
 typedef struct{
@@ -83,8 +72,8 @@ typedef struct {
 
     int queueProcPriority;
 
-    sem_t addEventSem;
-    sem_t notifyDispatcherSem;
+    XLink_sem_t addEventSem;
+    XLink_sem_t notifyDispatcherSem;
     volatile uint32_t resetXLink;
     uint32_t semaphores;
     pthread_t xLinkThreadId;
@@ -126,9 +115,8 @@ static pthread_mutex_t clean_mutex = PTHREAD_MUTEX_INITIALIZER;
 //below workaround for "C2088 '==': illegal for struct" error
 static int pthread_t_compare(pthread_t a, pthread_t b);
 
-static sem_t* createSem(xLinkSchedulerState_t* curr);
-static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref);
-static int unrefSem(sem_t* sem,  xLinkSchedulerState_t* curr);
+static XLink_sem_t* createSem(xLinkSchedulerState_t* curr);
+static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr);
 
 #if (defined(_WIN32) || defined(_WIN64))
 static void* __cdecl eventReader(void* ctx);
@@ -155,7 +143,7 @@ static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr);
 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q );
 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
                                             eventQueueHandler_t *q, xLinkEvent_t* event,
-                                            sem_t* sem, xLinkEventOrigin_t o);
+                                            XLink_sem_t* sem, xLinkEventOrigin_t o);
 
 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr);
 
@@ -245,16 +233,16 @@ XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle)
         schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED;
     }
 
-    if (sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
+    if (XLink_sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
         perror("Can't create semaphore\n");
         return -1;
     }
-    if (sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
+    if (XLink_sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
         perror("Can't create semaphore\n");
     }
     localSem_t* temp = schedulerState[idx].eventSemaphores;
     while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) {
-        temp->refs = -1;
+        XLink_sem_set_refs(&temp->sem, -1);
         temp++;
     }
     if (pthread_attr_init(&attr) != 0) {
@@ -326,22 +314,22 @@ xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
         return NULL;
     }
     mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
-    if (sem_wait(&curr->addEventSem)) {
+    if (XLink_sem_wait(&curr->addEventSem)) {
         mvLog(MVLOG_ERROR,"can't wait semaphore\n");
         return NULL;
     }
 
-    sem_t *sem = NULL;
+    XLink_sem_t *sem = NULL;
     xLinkEvent_t* ev;
     if (origin == EVENT_LOCAL) {
         event->header.id = createUniqueID();
-        sem = getAndRefSem(pthread_self(), curr, 1);
+        sem = getSem(pthread_self(), curr);
         if (!sem) {
             sem = createSem(curr);
         }
         if (!sem) {
             mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n");
-            if (sem_post(&curr->addEventSem)) {
+            if (XLink_sem_post(&curr->addEventSem)) {
                 mvLog(MVLOG_ERROR,"can't post semaphore\n");
             }
 
@@ -352,10 +340,10 @@ xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
     } else {
         ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
     }
-    if (sem_post(&curr->addEventSem)) {
+    if (XLink_sem_post(&curr->addEventSem)) {
         mvLog(MVLOG_ERROR,"can't post semaphore\n");
     }
-    if (sem_post(&curr->notifyDispatcherSem)) {
+    if (XLink_sem_post(&curr->notifyDispatcherSem)) {
         mvLog(MVLOG_ERROR, "can't post semaphore\n");
     }
     return ev;
@@ -366,12 +354,12 @@ int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle)
     xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
     ASSERT_XLINK(curr != NULL);
 
-    sem_t* id = getAndRefSem(pthread_self(), curr, 0);
+    XLink_sem_t* id = getSem(pthread_self(), curr);
     if (id == NULL) {
         return -1;
     }
 
-    int rc = sem_wait(id);
+    int rc = XLink_sem_wait(id);
 #ifdef __PC__
     if (rc) {
         xLinkEvent_t event = {0};
@@ -379,17 +367,13 @@ int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle)
         event.deviceHandle = *deviceHandle;
         mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
         DispatcherAddEvent(EVENT_LOCAL, &event);
-        id = getAndRefSem(pthread_self(), curr, 0);
-        if (id == NULL || sem_wait(id)) {
+        id = getSem(pthread_self(), curr);
+        if (id == NULL || XLink_sem_wait(id)) {
             dispatcherReset(curr);
         }
     }
 #endif
 
-    if ((XLinkError_t)unrefSem(id, curr) == X_LINK_ERROR) {
-        mvLog(MVLOG_WARN, "Failed to unref sem");
-    }
-
     return rc;
 }
 
@@ -439,7 +423,7 @@ int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t strea
                   (int)blockedEvent->packet.header.id,
                   TypeToStr((int)blockedEvent->packet.header.type));
             blockedEvent->isServed = EVENT_READY;
-            if (sem_post(&curr->notifyDispatcherSem)){
+            if (XLink_sem_post(&curr->notifyDispatcherSem)){
                 mvLog(MVLOG_ERROR, "can't post semaphore\n");
             }
             return 1;
@@ -471,11 +455,11 @@ int pthread_t_compare(pthread_t a, pthread_t b)
 #endif
 }
 
-static sem_t* createSem(xLinkSchedulerState_t* curr)
+static XLink_sem_t* createSem(xLinkSchedulerState_t* curr)
 {
     XLINK_RET_ERR_IF(curr == NULL, NULL);
 
-    sem_t* sem = getAndRefSem(pthread_self(), curr, 0);
+    XLink_sem_t* sem = getSem(pthread_self(), curr);
     if (sem) {// it already exists, error
         return NULL;
     }
@@ -484,11 +468,13 @@ static sem_t* createSem(xLinkSchedulerState_t* curr)
         localSem_t* temp = curr->eventSemaphores;
 
         while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
-            if (temp->refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) {
-                if (curr->semaphores == MAXIMUM_SEMAPHORES && !temp->refs) {
-                    XLINK_RET_ERR_IF(sem_destroy(&temp->sem) == -1, NULL);
+            int refs = 0;
+            XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
+            if (refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) {
+                if (curr->semaphores == MAXIMUM_SEMAPHORES && refs == 0) {
+                    XLINK_RET_ERR_IF(XLink_sem_destroy(&temp->sem), NULL);
+                    XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
                     curr->semaphores --;
-                    temp->refs = -1;
 #if (defined(_WIN32) || defined(_WIN64))
                     memset(&temp->threadId, 0, sizeof(temp->threadId));
 #else
@@ -496,14 +482,13 @@ static sem_t* createSem(xLinkSchedulerState_t* curr)
 #endif
                 }
 
-                if (temp->refs == -1) {
+                if (refs == -1) {
                     sem = &temp->sem;
-                    if (sem_init(sem, 0, 0)){
+                    if (XLink_sem_init(sem, 0, 0)){
                         mvLog(MVLOG_ERROR, "Error: Can't create semaphore\n");
                         return NULL;
                     }
                     curr->semaphores++;
-                    temp->refs = 1;
                     temp->threadId = pthread_self();
                     break;
                 }
@@ -522,33 +507,20 @@ static sem_t* createSem(xLinkSchedulerState_t* curr)
     return sem;
 }
 
-static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref)
+static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr)
 {
     XLINK_RET_ERR_IF(curr == NULL, NULL);
 
-    localSem_t* sem = curr->eventSemaphores;
-    while (sem < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
-        if (pthread_t_compare(sem->threadId, threadId) && sem->refs >= 0) {
-            sem->refs += inc_ref;
-            return &sem->sem;
-        }
-        sem++;
-    }
-    return NULL;
-}
-
-static int unrefSem(sem_t* sem,  xLinkSchedulerState_t* curr) {
-    ASSERT_XLINK(curr != NULL);
     localSem_t* temp = curr->eventSemaphores;
     while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
-        if (&temp->sem == sem) {
-            temp->refs--;
-            return 1;
+        int refs = 0;
+        XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
+        if (pthread_t_compare(temp->threadId, threadId) && refs >= 0) {
+            return &temp->sem;
         }
         temp++;
     }
-    mvLog(MVLOG_WARN,"unrefSem : sem wasn't found\n");
-    return 0;
+    return NULL;
 }
 
 #if (defined(_WIN32) || defined(_WIN64))
@@ -681,7 +653,7 @@ static void postAndMarkEventServed(xLinkEventPriv_t *event)
         *(event->retEv) = event->packet;
     }
     if(event->sem){
-        if (sem_post(event->sem)) {
+        if (XLink_sem_post(event->sem)) {
             mvLog(MVLOG_ERROR,"can't post semaphore\n");
         }
     }
@@ -826,7 +798,7 @@ static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){
  */
 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
                                             eventQueueHandler_t *q, xLinkEvent_t* event,
-                                            sem_t* sem, xLinkEventOrigin_t o){
+                                            XLink_sem_t* sem, xLinkEventOrigin_t o){
     xLinkEvent_t* ev;
     xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
     if (eventP == NULL) {
@@ -856,7 +828,7 @@ static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
 {
     XLINK_RET_ERR_IF(curr == NULL, NULL);
 
-    if (sem_wait(&curr->notifyDispatcherSem)) {
+    if (XLink_sem_wait(&curr->notifyDispatcherSem)) {
         mvLog(MVLOG_ERROR,"can't post semaphore\n");
     }
 
@@ -893,7 +865,7 @@ static int dispatcherClean(xLinkSchedulerState_t* curr)
 
     mvLog(MVLOG_INFO, "Start Clean Dispatcher...");
 
-    if (sem_post(&curr->notifyDispatcherSem)) {
+    if (XLink_sem_post(&curr->notifyDispatcherSem)) {
         mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event
     }
     xLinkEventPriv_t* event = dispatcherGetNextEvent(curr);
@@ -910,14 +882,13 @@ static int dispatcherClean(xLinkSchedulerState_t* curr)
 
     curr->schedulerId = -1;
     curr->resetXLink = 1;
-    sem_destroy(&curr->addEventSem);
-    sem_destroy(&curr->notifyDispatcherSem);
+    XLink_sem_destroy(&curr->addEventSem);
+    XLink_sem_destroy(&curr->notifyDispatcherSem);
     localSem_t* temp = curr->eventSemaphores;
     while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
         // unblock potentially blocked event semaphores
-        sem_post(&temp->sem);
-        sem_destroy(&temp->sem);
-        temp->refs = -1;
+        XLink_sem_post(&temp->sem);
+        XLink_sem_destroy(&temp->sem);
         temp++;
     }
     numSchedulers--;
@@ -939,7 +910,7 @@ static int dispatcherReset(xLinkSchedulerState_t* curr)
     }
 
     xLinkDesc_t* link = getLink(curr->deviceHandle.xLinkFD);
-    if(link == NULL || sem_post(&link->dispatcherClosedSem)) {
+    if(link == NULL || XLink_sem_post(&link->dispatcherClosedSem)) {
         mvLog(MVLOG_DEBUG,"can't post dispatcherClosedSem\n");
     }
 
index 3c2d791..e18a5e7 100644 (file)
@@ -340,7 +340,7 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response
                         stream->name[0] = '\0';
                     }
 #ifndef __PC__
-                    if(sem_destroy(&stream->sem))
+                    if(XLink_sem_destroy(&stream->sem))
                         perror("Can't destroy semaphore");
 #endif
                 }
@@ -452,7 +452,7 @@ void dispatcherCloseLink(void* fd, int fullClose)
         XLinkStreamReset(stream);
     }
 
-    if(sem_destroy(&link->dispatcherClosedSem)) {
+    if(XLink_sem_destroy(&link->dispatcherClosedSem)) {
         mvLog(MVLOG_DEBUG, "Cannot destroy dispatcherClosedSem\n");
     }
 }
index 5aeb4ed..46a97a2 100644 (file)
@@ -55,7 +55,7 @@ streamDesc_t* getStreamById(void* fd, streamId_t id)
     int stream;
     for (stream = 0; stream < XLINK_MAX_STREAMS; stream++) {
         if (link->availableStreams[stream].id == id) {
-            sem_wait(&link->availableStreams[stream].sem);
+            XLink_sem_wait(&link->availableStreams[stream].sem);
 
             return &link->availableStreams[stream];
         }
@@ -70,7 +70,7 @@ streamDesc_t* getStreamByName(xLinkDesc_t* link, const char* name)
     for (stream = 0; stream < XLINK_MAX_STREAMS; stream++) {
         if (link->availableStreams[stream].id != INVALID_STREAM_ID &&
             strcmp(link->availableStreams[stream].name, name) == 0) {
-            sem_wait(&link->availableStreams[stream].sem);
+            XLink_sem_wait(&link->availableStreams[stream].sem);
 
             return &link->availableStreams[stream];
         }
@@ -81,7 +81,7 @@ streamDesc_t* getStreamByName(xLinkDesc_t* link, const char* name)
 void releaseStream(streamDesc_t* stream)
 {
     if (stream && stream->id != INVALID_STREAM_ID) {
-        sem_post(&stream->sem);
+        XLink_sem_post(&stream->sem);
     }
     else {
         mvLog(MVLOG_DEBUG,"trying to release a semaphore for a released stream\n");
diff --git a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkSemaphore.c b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkSemaphore.c
new file mode 100644 (file)
index 0000000..e2fa8a4
--- /dev/null
@@ -0,0 +1,132 @@
+// Copyright (C) 2020 Intel Corporation
+// SPDX-License-Identifier: Apache-2.0
+//
+
+#include "XLinkSemaphore.h"
+#include "XLinkErrorUtils.h"
+#include "XLinkLog.h"
+
+static pthread_mutex_t ref_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_cond_t ref_cond = PTHREAD_COND_INITIALIZER;
+
+int XLink_sem_inc(XLink_sem_t* sem)
+{
+    XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+    if (sem->refs < 0) {
+        // Semaphore has been already destroyed
+        XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+        return -1;
+    }
+
+    sem->refs++;
+    XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+    return 0;
+}
+
+int XLink_sem_dec(XLink_sem_t* sem)
+{
+    XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+    if (sem->refs < 1) {
+        // Can't decrement reference count if there are no waiters
+        // or semaphore has been already destroyed
+        XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+        return -1;
+    }
+
+    sem->refs--;
+    int ret = pthread_cond_broadcast(&ref_cond);
+    XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+    return ret;
+}
+
+
+int XLink_sem_init(XLink_sem_t* sem, int pshared, unsigned int value)
+{
+    XLINK_RET_ERR_IF(sem == NULL, -1);
+
+    XLINK_RET_IF_FAIL(sem_init(&sem->psem, pshared, value));
+    XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+    sem->refs = 0;
+    XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+    return 0;
+}
+
+int XLink_sem_destroy(XLink_sem_t* sem)
+{
+    XLINK_RET_ERR_IF(sem == NULL, -1);
+
+    XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+    if (sem->refs < 0) {
+        // Semaphore has been already destroyed
+        XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+        return -1;
+    }
+
+    while(sem->refs > 0) {
+        if (pthread_cond_wait(&ref_cond, &ref_mutex)) {
+            break;
+        };
+    }
+    sem->refs = -1;
+    int ret = sem_destroy(&sem->psem);
+    XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+    return ret;
+}
+
+int XLink_sem_post(XLink_sem_t* sem)
+{
+    XLINK_RET_ERR_IF(sem == NULL, -1);
+    if (sem->refs < 0) {
+        return -1;
+    }
+
+    return sem_post(&sem->psem);
+}
+
+int XLink_sem_wait(XLink_sem_t* sem)
+{
+    XLINK_RET_ERR_IF(sem == NULL, -1);
+
+    XLINK_RET_IF_FAIL(XLink_sem_inc(sem));
+    int ret = sem_wait(&sem->psem);
+    XLINK_RET_IF_FAIL(XLink_sem_dec(sem));
+
+    return ret;
+}
+
+int XLink_sem_timedwait(XLink_sem_t* sem, const struct timespec* abstime)
+{
+    XLINK_RET_ERR_IF(sem == NULL, -1);
+    XLINK_RET_ERR_IF(abstime == NULL, -1);
+
+    XLINK_RET_IF_FAIL(XLink_sem_inc(sem));
+    int ret = sem_timedwait(&sem->psem, abstime);
+    XLINK_RET_IF_FAIL(XLink_sem_dec(sem));
+
+    return ret;
+}
+
+int XLink_sem_set_refs(XLink_sem_t* sem, int refs)
+{
+    XLINK_RET_ERR_IF(sem == NULL, -1);
+    XLINK_RET_ERR_IF(refs < -1, -1);
+
+    XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex));
+    sem->refs = refs;
+    int ret = pthread_cond_broadcast(&ref_cond);
+    XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex));
+
+    return ret;
+}
+
+int XLink_sem_get_refs(XLink_sem_t* sem, int *sval)
+{
+    XLINK_RET_ERR_IF(sem == NULL, -1);
+
+    *sval = sem->refs;
+    return 0;
+}
index 4371605..6c5945c 100644 (file)
@@ -22,7 +22,7 @@ XLinkError_t XLinkStreamInitialize(
 
     memset(stream, 0, sizeof(*stream));
 
-    if (sem_init(&stream->sem, 0, 0)) {
+    if (XLink_sem_init(&stream->sem, 0, 0)) {
         mvLog(MVLOG_ERROR, "Cannot initialize semaphore\n");
         return X_LINK_ERROR;
     }
@@ -39,7 +39,7 @@ void XLinkStreamReset(streamDesc_t* stream) {
         return;
     }
 
-    if(sem_destroy(&stream->sem)) {
+    if(XLink_sem_destroy(&stream->sem)) {
         mvLog(MVLOG_DEBUG, "Cannot destroy semaphore\n");
     }