From: Maksim Doronin Date: Sun, 15 Nov 2020 23:51:46 +0000 (+0300) Subject: [IE][VPU][XLink]: XLink semaphore wrappers impl (#3079) X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=2a7f2f5eb68b0aefdba0106c3e3185cbbcfc14e0;p=platform%2Fupstream%2Fdldt.git [IE][VPU][XLink]: XLink semaphore wrappers impl (#3079) XLink wrappers for POSIX semaphore functions (refer sem_overview for details). In the description of standard sem_destroy the following is noted: "Destroying a semaphore that other processes or threads are currently blocked on (in sem_wait(3)) produces undefined behavior." XLink wrappers use thread-safe reference count and destroy the semaphore only in case if there are no waiters. * XLink semaphore wrapper impl * Extend XLink win_synchapi --- diff --git a/inference-engine/thirdparty/movidius/XLink/pc/Win/include/win_synchapi.h b/inference-engine/thirdparty/movidius/XLink/pc/Win/include/win_synchapi.h index 32584a3..e773dba 100644 --- a/inference-engine/thirdparty/movidius/XLink/pc/Win/include/win_synchapi.h +++ b/inference-engine/thirdparty/movidius/XLink/pc/Win/include/win_synchapi.h @@ -12,6 +12,8 @@ extern "C" { #endif +#define PTHREAD_COND_INITIALIZER {0} + typedef struct _pthread_condattr_t pthread_condattr_t; typedef struct @@ -23,9 +25,13 @@ pthread_cond_t; int pthread_cond_init(pthread_cond_t* __cond, const pthread_condattr_t* __cond_attr); int pthread_cond_destroy(pthread_cond_t* __cond); +int pthread_cond_wait(pthread_cond_t *__cond, + pthread_mutex_t *__mutex); int pthread_cond_timedwait(pthread_cond_t* __cond, pthread_mutex_t* __mutex, const struct timespec* __abstime); + +int pthread_cond_signal(pthread_cond_t* __cond); int pthread_cond_broadcast(pthread_cond_t* __cond); #ifdef __cplusplus diff --git a/inference-engine/thirdparty/movidius/XLink/pc/Win/src/win_synchapi.c b/inference-engine/thirdparty/movidius/XLink/pc/Win/src/win_synchapi.c index a9e09c4..974b36f 100644 --- a/inference-engine/thirdparty/movidius/XLink/pc/Win/src/win_synchapi.c +++ b/inference-engine/thirdparty/movidius/XLink/pc/Win/src/win_synchapi.c @@ -21,9 +21,17 @@ int pthread_cond_destroy(pthread_cond_t* __cond) return 0; } +int pthread_cond_wait(pthread_cond_t *__cond, + pthread_mutex_t *__mutex) +{ + if (__cond == NULL || __mutex == NULL) + return ERROR_INVALID_HANDLE; + return pthread_cond_timedwait(__cond, __mutex, NULL); +} + int pthread_cond_timedwait(pthread_cond_t* __cond, pthread_mutex_t* __mutex, - const struct timespec* __abstime) + const struct timespec* __abstime) { if (__cond == NULL) { return ERROR_INVALID_HANDLE; @@ -42,7 +50,7 @@ int pthread_cond_timedwait(pthread_cond_t* __cond, return rc == ERROR_TIMEOUT ? ETIMEDOUT : rc; } -int pthread_cond_broadcast(pthread_cond_t *__cond) +int pthread_cond_signal(pthread_cond_t *__cond) { if (__cond == NULL) { return ERROR_INVALID_HANDLE; @@ -51,3 +59,13 @@ int pthread_cond_broadcast(pthread_cond_t *__cond) WakeConditionVariable(&__cond->_cv); return 0; } + +int pthread_cond_broadcast(pthread_cond_t *__cond) +{ + if (__cond == NULL) { + return ERROR_INVALID_HANDLE; + } + + WakeAllConditionVariable(&__cond->_cv); + return 0; +} diff --git a/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkPrivateDefines.h b/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkPrivateDefines.h index 840653e..ffeea35 100644 --- a/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkPrivateDefines.h +++ b/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkPrivateDefines.h @@ -57,7 +57,7 @@ typedef struct xLinkDesc_t { xLinkState_t peerState; xLinkDeviceHandle_t deviceHandle; linkId_t id; - sem_t dispatcherClosedSem; + XLink_sem_t dispatcherClosedSem; //Deprecated fields. Begin. int hostClosedFD; diff --git a/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkSemaphore.h b/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkSemaphore.h new file mode 100644 index 0000000..8bf8592 --- /dev/null +++ b/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkSemaphore.h @@ -0,0 +1,74 @@ +// Copyright (C) 2020 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +/// +/// @file +/// +/// @brief Application configuration Leon header +/// + +#ifndef _XLINKSEMAPHORE_H +#define _XLINKSEMAPHORE_H + +# if (defined(_WIN32) || defined(_WIN64)) +# include "win_pthread.h" +# include "win_semaphore.h" +# include "win_synchapi.h" +# else +# include +# ifdef __APPLE__ +# include "pthread_semaphore.h" +# else +# include +# endif +# endif + +#ifdef __cplusplus +extern "C" +{ +#endif + +// +// This structure describes the semaphore used in XLink and +// extends the standard semaphore with a reference count. +// The counter is thread-safe and changes only in cases if +// all tools of thread synchronization are really unlocked. +// refs == -1 in case if semaphore was destroyed; +// refs == 0 in case if semaphore was initialized but has no waiters; +// refs == N in case if there are N waiters which called sem_wait(). +// + +typedef struct { + sem_t psem; + int refs; +} XLink_sem_t; + +// +// XLink wrappers for POSIX semaphore functions (refer sem_overview for details) +// In description of standard sem_destroy the following can be noted: +// "Destroying a semaphore that other processes or threads are currently +// blocked on (in sem_wait(3)) produces undefined behavior." +// XLink wrappers use thread-safe reference count and destroy the semaphore only in case +// if there are no waiters +// + +int XLink_sem_init(XLink_sem_t* sem, int pshared, unsigned int value); +int XLink_sem_destroy(XLink_sem_t* sem); +int XLink_sem_post(XLink_sem_t* sem); +int XLink_sem_wait(XLink_sem_t* sem); +int XLink_sem_timedwait(XLink_sem_t* sem, const struct timespec* abstime); + +// +// Helper functions for XLink semaphore wrappers. +// Use them only in case if you know what you are doing. +// + +int XLink_sem_set_refs(XLink_sem_t* sem, int refs); +int XLink_sem_get_refs(XLink_sem_t* sem, int *sval); + +#ifdef __cplusplus +} +#endif + +#endif // _XLINKSEMAPHORE_H diff --git a/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkStream.h b/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkStream.h index 97b65af..88fac44 100644 --- a/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkStream.h +++ b/inference-engine/thirdparty/movidius/XLink/shared/include/XLinkStream.h @@ -6,16 +6,7 @@ #define _XLINKSTREAM_H #include "XLinkPublicDefines.h" - -# if (defined(_WIN32) || defined(_WIN64)) -# include "win_semaphore.h" -# else -# ifdef __APPLE__ -# include "pthread_semaphore.h" -# else -# include -# endif -# endif +#include "XLinkSemaphore.h" /** * @brief Streams opened to device @@ -40,7 +31,7 @@ typedef struct{ uint32_t closeStreamInitiated; - sem_t sem; + XLink_sem_t sem; }streamDesc_t; XLinkError_t XLinkStreamInitialize( diff --git a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDevice.c b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDevice.c index 019a2d6..533aaf7 100644 --- a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDevice.c +++ b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDevice.c @@ -253,7 +253,7 @@ XLinkError_t XLinkResetRemote(linkId_t id) XLINK_RET_ERR_IF(DispatcherWaitEventComplete(&link->deviceHandle), X_LINK_TIMEOUT); - if(sem_wait(&link->dispatcherClosedSem)) { + if(XLink_sem_wait(&link->dispatcherClosedSem)) { mvLog(MVLOG_ERROR,"can't wait dispatcherClosedSem\n"); return X_LINK_ERROR; } @@ -390,7 +390,7 @@ static xLinkDesc_t* getNextAvailableLink() { xLinkDesc_t* link = &availableXLinks[i]; - if (sem_init(&link->dispatcherClosedSem, 0 ,0)) { + if (XLink_sem_init(&link->dispatcherClosedSem, 0 ,0)) { mvLog(MVLOG_ERROR, "Cannot initialize semaphore\n"); return NULL; } diff --git a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcher.c b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcher.c index fc0a660..8ef396f 100644 --- a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcher.c +++ b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcher.c @@ -18,16 +18,6 @@ #include #include -#if (defined(_WIN32) || defined(_WIN64)) -# include "win_pthread.h" -# include "win_semaphore.h" -#else -# include -# ifndef __APPLE__ -# include -# endif -#endif - #include "XLinkDispatcher.h" #include "XLinkMacros.h" #include "XLinkPrivateDefines.h" @@ -55,14 +45,13 @@ typedef struct xLinkEventPriv_t { xLinkEvent_t *retEv; xLinkEventState_t isServed; xLinkEventOrigin_t origin; - sem_t* sem; + XLink_sem_t* sem; void* data; } xLinkEventPriv_t; typedef struct { - sem_t sem; + XLink_sem_t sem; pthread_t threadId; - int refs; } localSem_t; typedef struct{ @@ -83,8 +72,8 @@ typedef struct { int queueProcPriority; - sem_t addEventSem; - sem_t notifyDispatcherSem; + XLink_sem_t addEventSem; + XLink_sem_t notifyDispatcherSem; volatile uint32_t resetXLink; uint32_t semaphores; pthread_t xLinkThreadId; @@ -126,9 +115,8 @@ static pthread_mutex_t clean_mutex = PTHREAD_MUTEX_INITIALIZER; //below workaround for "C2088 '==': illegal for struct" error static int pthread_t_compare(pthread_t a, pthread_t b); -static sem_t* createSem(xLinkSchedulerState_t* curr); -static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref); -static int unrefSem(sem_t* sem, xLinkSchedulerState_t* curr); +static XLink_sem_t* createSem(xLinkSchedulerState_t* curr); +static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr); #if (defined(_WIN32) || defined(_WIN64)) static void* __cdecl eventReader(void* ctx); @@ -155,7 +143,7 @@ static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr); static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ); static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr, eventQueueHandler_t *q, xLinkEvent_t* event, - sem_t* sem, xLinkEventOrigin_t o); + XLink_sem_t* sem, xLinkEventOrigin_t o); static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr); @@ -245,16 +233,16 @@ XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle) schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED; } - if (sem_init(&schedulerState[idx].addEventSem, 0, 1)) { + if (XLink_sem_init(&schedulerState[idx].addEventSem, 0, 1)) { perror("Can't create semaphore\n"); return -1; } - if (sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) { + if (XLink_sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) { perror("Can't create semaphore\n"); } localSem_t* temp = schedulerState[idx].eventSemaphores; while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) { - temp->refs = -1; + XLink_sem_set_refs(&temp->sem, -1); temp++; } if (pthread_attr_init(&attr) != 0) { @@ -326,22 +314,22 @@ xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event) return NULL; } mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin); - if (sem_wait(&curr->addEventSem)) { + if (XLink_sem_wait(&curr->addEventSem)) { mvLog(MVLOG_ERROR,"can't wait semaphore\n"); return NULL; } - sem_t *sem = NULL; + XLink_sem_t *sem = NULL; xLinkEvent_t* ev; if (origin == EVENT_LOCAL) { event->header.id = createUniqueID(); - sem = getAndRefSem(pthread_self(), curr, 1); + sem = getSem(pthread_self(), curr); if (!sem) { sem = createSem(curr); } if (!sem) { mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n"); - if (sem_post(&curr->addEventSem)) { + if (XLink_sem_post(&curr->addEventSem)) { mvLog(MVLOG_ERROR,"can't post semaphore\n"); } @@ -352,10 +340,10 @@ xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event) } else { ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin); } - if (sem_post(&curr->addEventSem)) { + if (XLink_sem_post(&curr->addEventSem)) { mvLog(MVLOG_ERROR,"can't post semaphore\n"); } - if (sem_post(&curr->notifyDispatcherSem)) { + if (XLink_sem_post(&curr->notifyDispatcherSem)) { mvLog(MVLOG_ERROR, "can't post semaphore\n"); } return ev; @@ -366,12 +354,12 @@ int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle) xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD); ASSERT_XLINK(curr != NULL); - sem_t* id = getAndRefSem(pthread_self(), curr, 0); + XLink_sem_t* id = getSem(pthread_self(), curr); if (id == NULL) { return -1; } - int rc = sem_wait(id); + int rc = XLink_sem_wait(id); #ifdef __PC__ if (rc) { xLinkEvent_t event = {0}; @@ -379,17 +367,13 @@ int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle) event.deviceHandle = *deviceHandle; mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event"); DispatcherAddEvent(EVENT_LOCAL, &event); - id = getAndRefSem(pthread_self(), curr, 0); - if (id == NULL || sem_wait(id)) { + id = getSem(pthread_self(), curr); + if (id == NULL || XLink_sem_wait(id)) { dispatcherReset(curr); } } #endif - if ((XLinkError_t)unrefSem(id, curr) == X_LINK_ERROR) { - mvLog(MVLOG_WARN, "Failed to unref sem"); - } - return rc; } @@ -439,7 +423,7 @@ int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t strea (int)blockedEvent->packet.header.id, TypeToStr((int)blockedEvent->packet.header.type)); blockedEvent->isServed = EVENT_READY; - if (sem_post(&curr->notifyDispatcherSem)){ + if (XLink_sem_post(&curr->notifyDispatcherSem)){ mvLog(MVLOG_ERROR, "can't post semaphore\n"); } return 1; @@ -471,11 +455,11 @@ int pthread_t_compare(pthread_t a, pthread_t b) #endif } -static sem_t* createSem(xLinkSchedulerState_t* curr) +static XLink_sem_t* createSem(xLinkSchedulerState_t* curr) { XLINK_RET_ERR_IF(curr == NULL, NULL); - sem_t* sem = getAndRefSem(pthread_self(), curr, 0); + XLink_sem_t* sem = getSem(pthread_self(), curr); if (sem) {// it already exists, error return NULL; } @@ -484,11 +468,13 @@ static sem_t* createSem(xLinkSchedulerState_t* curr) localSem_t* temp = curr->eventSemaphores; while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) { - if (temp->refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) { - if (curr->semaphores == MAXIMUM_SEMAPHORES && !temp->refs) { - XLINK_RET_ERR_IF(sem_destroy(&temp->sem) == -1, NULL); + int refs = 0; + XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL); + if (refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) { + if (curr->semaphores == MAXIMUM_SEMAPHORES && refs == 0) { + XLINK_RET_ERR_IF(XLink_sem_destroy(&temp->sem), NULL); + XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL); curr->semaphores --; - temp->refs = -1; #if (defined(_WIN32) || defined(_WIN64)) memset(&temp->threadId, 0, sizeof(temp->threadId)); #else @@ -496,14 +482,13 @@ static sem_t* createSem(xLinkSchedulerState_t* curr) #endif } - if (temp->refs == -1) { + if (refs == -1) { sem = &temp->sem; - if (sem_init(sem, 0, 0)){ + if (XLink_sem_init(sem, 0, 0)){ mvLog(MVLOG_ERROR, "Error: Can't create semaphore\n"); return NULL; } curr->semaphores++; - temp->refs = 1; temp->threadId = pthread_self(); break; } @@ -522,33 +507,20 @@ static sem_t* createSem(xLinkSchedulerState_t* curr) return sem; } -static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref) +static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr) { XLINK_RET_ERR_IF(curr == NULL, NULL); - localSem_t* sem = curr->eventSemaphores; - while (sem < curr->eventSemaphores + MAXIMUM_SEMAPHORES) { - if (pthread_t_compare(sem->threadId, threadId) && sem->refs >= 0) { - sem->refs += inc_ref; - return &sem->sem; - } - sem++; - } - return NULL; -} - -static int unrefSem(sem_t* sem, xLinkSchedulerState_t* curr) { - ASSERT_XLINK(curr != NULL); localSem_t* temp = curr->eventSemaphores; while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) { - if (&temp->sem == sem) { - temp->refs--; - return 1; + int refs = 0; + XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL); + if (pthread_t_compare(temp->threadId, threadId) && refs >= 0) { + return &temp->sem; } temp++; } - mvLog(MVLOG_WARN,"unrefSem : sem wasn't found\n"); - return 0; + return NULL; } #if (defined(_WIN32) || defined(_WIN64)) @@ -681,7 +653,7 @@ static void postAndMarkEventServed(xLinkEventPriv_t *event) *(event->retEv) = event->packet; } if(event->sem){ - if (sem_post(event->sem)) { + if (XLink_sem_post(event->sem)) { mvLog(MVLOG_ERROR,"can't post semaphore\n"); } } @@ -826,7 +798,7 @@ static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){ */ static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr, eventQueueHandler_t *q, xLinkEvent_t* event, - sem_t* sem, xLinkEventOrigin_t o){ + XLink_sem_t* sem, xLinkEventOrigin_t o){ xLinkEvent_t* ev; xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED); if (eventP == NULL) { @@ -856,7 +828,7 @@ static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr) { XLINK_RET_ERR_IF(curr == NULL, NULL); - if (sem_wait(&curr->notifyDispatcherSem)) { + if (XLink_sem_wait(&curr->notifyDispatcherSem)) { mvLog(MVLOG_ERROR,"can't post semaphore\n"); } @@ -893,7 +865,7 @@ static int dispatcherClean(xLinkSchedulerState_t* curr) mvLog(MVLOG_INFO, "Start Clean Dispatcher..."); - if (sem_post(&curr->notifyDispatcherSem)) { + if (XLink_sem_post(&curr->notifyDispatcherSem)) { mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event } xLinkEventPriv_t* event = dispatcherGetNextEvent(curr); @@ -910,14 +882,13 @@ static int dispatcherClean(xLinkSchedulerState_t* curr) curr->schedulerId = -1; curr->resetXLink = 1; - sem_destroy(&curr->addEventSem); - sem_destroy(&curr->notifyDispatcherSem); + XLink_sem_destroy(&curr->addEventSem); + XLink_sem_destroy(&curr->notifyDispatcherSem); localSem_t* temp = curr->eventSemaphores; while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) { // unblock potentially blocked event semaphores - sem_post(&temp->sem); - sem_destroy(&temp->sem); - temp->refs = -1; + XLink_sem_post(&temp->sem); + XLink_sem_destroy(&temp->sem); temp++; } numSchedulers--; @@ -939,7 +910,7 @@ static int dispatcherReset(xLinkSchedulerState_t* curr) } xLinkDesc_t* link = getLink(curr->deviceHandle.xLinkFD); - if(link == NULL || sem_post(&link->dispatcherClosedSem)) { + if(link == NULL || XLink_sem_post(&link->dispatcherClosedSem)) { mvLog(MVLOG_DEBUG,"can't post dispatcherClosedSem\n"); } diff --git a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcherImpl.c b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcherImpl.c index 3c2d791..e18a5e7 100644 --- a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcherImpl.c +++ b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkDispatcherImpl.c @@ -340,7 +340,7 @@ int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response stream->name[0] = '\0'; } #ifndef __PC__ - if(sem_destroy(&stream->sem)) + if(XLink_sem_destroy(&stream->sem)) perror("Can't destroy semaphore"); #endif } @@ -452,7 +452,7 @@ void dispatcherCloseLink(void* fd, int fullClose) XLinkStreamReset(stream); } - if(sem_destroy(&link->dispatcherClosedSem)) { + if(XLink_sem_destroy(&link->dispatcherClosedSem)) { mvLog(MVLOG_DEBUG, "Cannot destroy dispatcherClosedSem\n"); } } diff --git a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkPrivateFields.c b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkPrivateFields.c index 5aeb4ed..46a97a2 100644 --- a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkPrivateFields.c +++ b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkPrivateFields.c @@ -55,7 +55,7 @@ streamDesc_t* getStreamById(void* fd, streamId_t id) int stream; for (stream = 0; stream < XLINK_MAX_STREAMS; stream++) { if (link->availableStreams[stream].id == id) { - sem_wait(&link->availableStreams[stream].sem); + XLink_sem_wait(&link->availableStreams[stream].sem); return &link->availableStreams[stream]; } @@ -70,7 +70,7 @@ streamDesc_t* getStreamByName(xLinkDesc_t* link, const char* name) for (stream = 0; stream < XLINK_MAX_STREAMS; stream++) { if (link->availableStreams[stream].id != INVALID_STREAM_ID && strcmp(link->availableStreams[stream].name, name) == 0) { - sem_wait(&link->availableStreams[stream].sem); + XLink_sem_wait(&link->availableStreams[stream].sem); return &link->availableStreams[stream]; } @@ -81,7 +81,7 @@ streamDesc_t* getStreamByName(xLinkDesc_t* link, const char* name) void releaseStream(streamDesc_t* stream) { if (stream && stream->id != INVALID_STREAM_ID) { - sem_post(&stream->sem); + XLink_sem_post(&stream->sem); } else { mvLog(MVLOG_DEBUG,"trying to release a semaphore for a released stream\n"); diff --git a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkSemaphore.c b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkSemaphore.c new file mode 100644 index 0000000..e2fa8a4 --- /dev/null +++ b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkSemaphore.c @@ -0,0 +1,132 @@ +// Copyright (C) 2020 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 +// + +#include "XLinkSemaphore.h" +#include "XLinkErrorUtils.h" +#include "XLinkLog.h" + +static pthread_mutex_t ref_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t ref_cond = PTHREAD_COND_INITIALIZER; + +int XLink_sem_inc(XLink_sem_t* sem) +{ + XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex)); + if (sem->refs < 0) { + // Semaphore has been already destroyed + XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex)); + return -1; + } + + sem->refs++; + XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex)); + + return 0; +} + +int XLink_sem_dec(XLink_sem_t* sem) +{ + XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex)); + if (sem->refs < 1) { + // Can't decrement reference count if there are no waiters + // or semaphore has been already destroyed + XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex)); + return -1; + } + + sem->refs--; + int ret = pthread_cond_broadcast(&ref_cond); + XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex)); + + return ret; +} + + +int XLink_sem_init(XLink_sem_t* sem, int pshared, unsigned int value) +{ + XLINK_RET_ERR_IF(sem == NULL, -1); + + XLINK_RET_IF_FAIL(sem_init(&sem->psem, pshared, value)); + XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex)); + sem->refs = 0; + XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex)); + + return 0; +} + +int XLink_sem_destroy(XLink_sem_t* sem) +{ + XLINK_RET_ERR_IF(sem == NULL, -1); + + XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex)); + if (sem->refs < 0) { + // Semaphore has been already destroyed + XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex)); + return -1; + } + + while(sem->refs > 0) { + if (pthread_cond_wait(&ref_cond, &ref_mutex)) { + break; + }; + } + sem->refs = -1; + int ret = sem_destroy(&sem->psem); + XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex)); + + return ret; +} + +int XLink_sem_post(XLink_sem_t* sem) +{ + XLINK_RET_ERR_IF(sem == NULL, -1); + if (sem->refs < 0) { + return -1; + } + + return sem_post(&sem->psem); +} + +int XLink_sem_wait(XLink_sem_t* sem) +{ + XLINK_RET_ERR_IF(sem == NULL, -1); + + XLINK_RET_IF_FAIL(XLink_sem_inc(sem)); + int ret = sem_wait(&sem->psem); + XLINK_RET_IF_FAIL(XLink_sem_dec(sem)); + + return ret; +} + +int XLink_sem_timedwait(XLink_sem_t* sem, const struct timespec* abstime) +{ + XLINK_RET_ERR_IF(sem == NULL, -1); + XLINK_RET_ERR_IF(abstime == NULL, -1); + + XLINK_RET_IF_FAIL(XLink_sem_inc(sem)); + int ret = sem_timedwait(&sem->psem, abstime); + XLINK_RET_IF_FAIL(XLink_sem_dec(sem)); + + return ret; +} + +int XLink_sem_set_refs(XLink_sem_t* sem, int refs) +{ + XLINK_RET_ERR_IF(sem == NULL, -1); + XLINK_RET_ERR_IF(refs < -1, -1); + + XLINK_RET_IF_FAIL(pthread_mutex_lock(&ref_mutex)); + sem->refs = refs; + int ret = pthread_cond_broadcast(&ref_cond); + XLINK_RET_IF_FAIL(pthread_mutex_unlock(&ref_mutex)); + + return ret; +} + +int XLink_sem_get_refs(XLink_sem_t* sem, int *sval) +{ + XLINK_RET_ERR_IF(sem == NULL, -1); + + *sval = sem->refs; + return 0; +} diff --git a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkStream.c b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkStream.c index 4371605..6c5945c 100644 --- a/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkStream.c +++ b/inference-engine/thirdparty/movidius/XLink/shared/src/XLinkStream.c @@ -22,7 +22,7 @@ XLinkError_t XLinkStreamInitialize( memset(stream, 0, sizeof(*stream)); - if (sem_init(&stream->sem, 0, 0)) { + if (XLink_sem_init(&stream->sem, 0, 0)) { mvLog(MVLOG_ERROR, "Cannot initialize semaphore\n"); return X_LINK_ERROR; } @@ -39,7 +39,7 @@ void XLinkStreamReset(streamDesc_t* stream) { return; } - if(sem_destroy(&stream->sem)) { + if(XLink_sem_destroy(&stream->sem)) { mvLog(MVLOG_DEBUG, "Cannot destroy semaphore\n"); }