1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
8 /// @brief Application configuration Leon header
12 #define _GNU_SOURCE // fix for warning: implicit declaration of function ‘pthread_setname_np’
21 #if (defined(_WIN32) || defined(_WIN64))
22 #include "win_pthread.h"
23 #include "win_semaphore.h"
26 #include <semaphore.h>
28 #include "XLinkDispatcher.h"
29 #include "XLinkPrivateDefines.h"
32 #define MVLOG_UNIT_NAME xLink
43 typedef struct xLinkEventPriv_t {
45 xLinkEventState_t isServed;
46 xLinkEventOrigin_t origin;
60 xLinkEventPriv_t* end;
61 xLinkEventPriv_t* base;
63 xLinkEventPriv_t* curProc;
64 xLinkEventPriv_t* cur;
65 __attribute__((aligned(64))) xLinkEventPriv_t q[MAX_EVENTS];
69 * @brief Scheduler for each device
72 xLinkDeviceHandle_t deviceHandle; //will be device handler
76 sem_t notifyDispatcherSem;
77 volatile uint32_t resetXLink;
79 pthread_t xLinkThreadId;
81 eventQueueHandler_t lQueue; //local queue
82 eventQueueHandler_t rQueue; //remote queue
83 localSem_t eventSemaphores[MAXIMUM_SEMAPHORES];
84 } xLinkSchedulerState_t;
87 #define CIRCULAR_INCREMENT(x, maxVal, base) \
93 //avoid problems with unsigned. first compare and then give the nuw value
94 #define CIRCULAR_DECREMENT(x, maxVal, base) \
102 extern char* TypeToStr(int type);
104 #if (defined(_WIN32) || defined(_WIN64))
105 static void* __cdecl eventSchedulerRun(void* ctx);
107 static void* eventSchedulerRun(void*);
109 //These will be common for all, Initialized only once
110 struct dispatcherControlFunctions* glControlFunc;
112 xLinkSchedulerState_t schedulerState[MAX_SCHEDULERS];
113 sem_t addSchedulerSem;
115 int pthread_t_compare(pthread_t a, pthread_t b)
117 #if (defined(_WIN32) || defined(_WIN64) )
118 return ((a.tid == b.tid));
124 static int unrefSem(sem_t* sem, xLinkSchedulerState_t* curr) {
125 ASSERT_X_LINK(curr != NULL);
126 localSem_t* temp = curr->eventSemaphores;
127 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
128 if (&temp->sem == sem) {
130 if (temp->refs == 0) {
132 ASSERT_X_LINK(sem_destroy(&temp->sem) != -1);
139 mvLog(MVLOG_WARN,"unrefSem : sem wasn't found\n");
142 static sem_t* getCurrentSem(pthread_t threadId, xLinkSchedulerState_t* curr, int inc_ref)
144 ASSERT_X_LINK_R(curr != NULL, NULL);
146 localSem_t* sem = curr->eventSemaphores;
147 while (sem < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
148 if (pthread_t_compare(sem->threadId, threadId) && sem->refs > 0) {
149 sem->refs += inc_ref;
157 static sem_t* createSem(xLinkSchedulerState_t* curr)
159 ASSERT_X_LINK_R(curr != NULL, NULL);
162 sem_t* sem = getCurrentSem(pthread_self(), curr, 0);
163 if (sem) // it already exists, error
167 if (curr->semaphores < MAXIMUM_SEMAPHORES) {
168 localSem_t* temp = curr->eventSemaphores;
169 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
170 if (temp->refs < 0) {
172 if (temp->refs == -1) {
173 if (sem_init(sem, 0, 0))
174 perror("Can't create semaphore\n");
178 temp->threadId = pthread_self();
193 #if (defined(_WIN32) || defined(_WIN64))
194 static void* __cdecl eventReader(void* ctx)
196 static void* eventReader(void* ctx)
199 xLinkSchedulerState_t *curr = (xLinkSchedulerState_t*)ctx;
200 ASSERT_X_LINK_R(curr, NULL);
202 xLinkEvent_t event = { 0 };
203 event.header.id = -1;
204 event.deviceHandle = curr->deviceHandle;
206 mvLog(MVLOG_INFO,"eventReader thread started");
208 while (!curr->resetXLink) {
209 int sc = glControlFunc->eventReceive(&event);
210 mvLog(MVLOG_DEBUG,"Reading %s (scheduler %d, fd %p, event id %d, event stream_id %u, event size %u)\n",
211 TypeToStr(event.header.type), curr->schedulerId, event.deviceHandle.xLinkFD, event.header.id, event.header.streamId, event.header.size);
213 if (event.header.type == XLINK_RESET_RESP) {
214 curr->resetXLink = 1;
215 mvLog(MVLOG_INFO,"eventReader thread stopped: reset");
220 if (sem_post(&curr->notifyDispatcherSem)) {
221 mvLog(MVLOG_ERROR,"can't post semaphore\n"); // stop eventSchedulerRun thread
223 mvLog(MVLOG_ERROR,"eventReader thread stopped (err %d)", sc);
233 static int isEventTypeRequest(xLinkEventPriv_t* event)
235 if (event->packet.header.type < XLINK_REQUEST_LAST)
241 static void markEventBlocked(xLinkEventPriv_t* event)
243 event->isServed = EVENT_BLOCKED;
246 static void markEventReady(xLinkEventPriv_t* event)
248 event->isServed = EVENT_READY;
251 static void markEventServed(xLinkEventPriv_t* event)
254 // the xLinkEventPriv_t slot pointed by "event" will be
255 // re-cycled as soon as we mark it as EVENT_SERVED,
256 // so before that, we copy the result event into XLink API layer
257 *(event->retEv) = event->packet;
260 if (sem_post(event->sem)) {
261 mvLog(MVLOG_ERROR,"can't post semaphore\n");
264 event->isServed = EVENT_SERVED;
268 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr){
269 ASSERT_X_LINK(curr != NULL);
270 ASSERT_X_LINK(isEventTypeRequest(event));
271 xLinkEventHeader_t *header = &event->packet.header;
272 if (header->flags.bitField.block){ //block is requested
273 markEventBlocked(event);
274 }else if(header->flags.bitField.localServe == 1 ||
275 (header->flags.bitField.ack == 0
276 && header->flags.bitField.nack == 1)){ //this event is served locally, or it is failed
277 markEventServed(event);
278 }else if (header->flags.bitField.ack == 1
279 && header->flags.bitField.nack == 0){
280 event->isServed = EVENT_PENDING;
281 mvLog(MVLOG_DEBUG,"------------------------UNserved %s\n",
282 TypeToStr(event->packet.header.type));
290 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr)
293 ASSERT_X_LINK(curr != NULL);
294 ASSERT_X_LINK(!isEventTypeRequest(event));
295 for (i = 0; i < MAX_EVENTS; i++)
297 xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
298 xLinkEventHeader_t *evHeader = &event->packet.header;
300 if (curr->lQueue.q[i].isServed == EVENT_PENDING &&
301 header->id == evHeader->id &&
302 header->type == evHeader->type - XLINK_REQUEST_LAST -1)
304 mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n",
305 TypeToStr(header->type));
306 //propagate back flags
307 header->flags = evHeader->flags;
308 markEventServed(&curr->lQueue.q[i]);
312 if (i == MAX_EVENTS) {
313 mvLog(MVLOG_FATAL,"no request for this response: %s %d %d\n", TypeToStr(event->packet.header.type), event->origin, event->packet.header.id);
314 printf("#### (i == MAX_EVENTS) %s %d %d\n", TypeToStr(event->packet.header.type), event->origin, (int)event->packet.header.id);
315 for (i = 0; i < MAX_EVENTS; i++)
317 xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
319 printf("%d) header->id %i, header->type %s(%i), curr->lQueue.q[i].isServed %i, EVENT_PENDING %i\n", i, (int)header->id
320 , TypeToStr(header->type), header->type, curr->lQueue.q[i].isServed, EVENT_PENDING);
328 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
329 xLinkEventPriv_t* start, xLinkEventState_t state){
330 xLinkEventPriv_t* tmp = start;
331 while (start->isServed != state){
332 CIRCULAR_INCREMENT(start, end, base);
337 if(start->isServed == state){
344 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr)
346 ASSERT_X_LINK_R(curr != NULL, NULL);
347 xLinkEventPriv_t* ev = NULL;
349 ev = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_READY);
351 mvLog(MVLOG_DEBUG,"ready %s %d \n",
352 TypeToStr((int)ev->packet.header.type),
353 (int)ev->packet.header.id);
358 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){
359 xLinkEventPriv_t* event = NULL;
360 event = getNextElementWithState(q->base, q->end, q->curProc, EVENT_ALLOCATED);
363 CIRCULAR_INCREMENT(q->curProc, q->end, q->base);
369 * @brief Add event to Queue
370 * @note It called from dispatcherAddEvent
372 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
373 eventQueueHandler_t *q, xLinkEvent_t* event,
374 sem_t* sem, xLinkEventOrigin_t o){
376 xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
377 if (eventP == NULL) {
378 mvLog(MVLOG_ERROR, "Can not get next element");
381 mvLog(MVLOG_DEBUG, "Received event %s %d", TypeToStr(event->header.type), o);
382 ev = &eventP->packet;
384 if ((XLinkError_t)unrefSem(eventP->sem, curr) == X_LINK_ERROR) {
385 mvLog(MVLOG_WARN, "Failed to unref sem");
390 eventP->packet = *event;
392 if (o == EVENT_LOCAL) {
393 // XLink API caller provided buffer for return the final result to
394 eventP->retEv = event;
396 eventP->retEv = NULL;
398 // Mark eventP as ALLOCATED to prevent it from being allocated again
399 eventP->isServed = EVENT_ALLOCATED;
401 CIRCULAR_INCREMENT(q->cur, q->end, q->base);
405 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
407 ASSERT_X_LINK_R(curr != NULL, NULL);
409 xLinkEventPriv_t* event = NULL;
410 event = searchForReadyEvent(curr);
414 if (XLinkWaitSem(&curr->notifyDispatcherSem)) {
415 mvLog(MVLOG_ERROR,"can't post semaphore\n");
418 event = getNextQueueElemToProc(&curr->lQueue);
422 event = getNextQueueElemToProc(&curr->rQueue);
426 static pthread_mutex_t reset_mutex = PTHREAD_MUTEX_INITIALIZER;
428 static int isAvailableScheduler(xLinkSchedulerState_t* curr)
430 if (curr->schedulerId == -1) {
431 mvLog(MVLOG_WARN,"Scheduler has already been reset or cleaned");
432 return 0; // resetted already
437 static void closeDeviceFdAndResetScheduler(xLinkSchedulerState_t* curr)
440 mvLog(MVLOG_INFO, "Dispatcher Cleaning...");
441 glControlFunc->closeDeviceFd(&curr->deviceHandle);
442 curr->schedulerId = -1;
443 curr->resetXLink = 1;
444 sem_destroy(&curr->addEventSem);
445 sem_destroy(&curr->notifyDispatcherSem);
446 localSem_t* temp = curr->eventSemaphores;
447 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
448 // unblock potentially blocked event semaphores
449 sem_post(&temp->sem);
450 sem_destroy(&temp->sem);
455 mvLog(MVLOG_INFO,"Cleaning Successfully\n");
460 static int dispatcherReset(xLinkSchedulerState_t* curr)
462 ASSERT_X_LINK(curr != NULL);
463 CHECK_MUTEX_SUCCESS_RC(pthread_mutex_lock(&reset_mutex), 1);
465 if(!isAvailableScheduler(curr)) {
466 CHECK_MUTEX_SUCCESS(pthread_mutex_unlock(&reset_mutex));
470 mvLog(MVLOG_INFO, "Resetting...");
472 glControlFunc->closeLink(curr->deviceHandle.xLinkFD);
474 //notifyDispatcherSem +1 for NULL event, avoid dispatcher blocking.
475 if (sem_post(&curr->notifyDispatcherSem)) {
476 mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event
479 xLinkEventPriv_t* event = dispatcherGetNextEvent(curr);
480 while (event != NULL) {
481 mvLog(MVLOG_INFO, "dropped event is %s, status %d\n",
482 TypeToStr(event->packet.header.type), event->isServed);
483 // although there is no no execution for this event, also mark it as being served without success
484 // caller will be informed and internal event memory slot will be de-allocated
485 markEventServed(event);
486 event = dispatcherGetNextEvent(curr);
489 event = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_PENDING);
490 while (event != NULL) {
491 mvLog(MVLOG_INFO,"Pending event is %s, size is %d, Mark it served\n", TypeToStr(event->packet.header.type), event->packet.header.size);
492 markEventServed(event);
493 event = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_PENDING);
495 closeDeviceFdAndResetScheduler(curr);
496 CHECK_MUTEX_SUCCESS(pthread_mutex_unlock(&reset_mutex));
499 #if (defined(_WIN32) || defined(_WIN64))
500 static void* __cdecl eventSchedulerRun(void* ctx)
502 static void* eventSchedulerRun(void* ctx)
505 int schedulerId = *((int*) ctx);
506 mvLog(MVLOG_DEBUG,"%s() schedulerId %d\n", __func__, schedulerId);
507 ASSERT_X_LINK_R(schedulerId < MAX_SCHEDULERS, NULL);
509 xLinkSchedulerState_t* curr = &schedulerState[schedulerId];
510 pthread_t readerThreadId; /* Create thread for reader.
511 This thread will notify the dispatcher of any incoming packets*/
515 if (pthread_attr_init(&attr) !=0) {
516 mvLog(MVLOG_ERROR,"pthread_attr_init error");
520 sc = pthread_create(&readerThreadId, &attr, eventReader, curr);
522 mvLog(MVLOG_ERROR, "Thread creation failed");
523 if (pthread_attr_destroy(&attr) != 0) {
524 perror("Thread attr destroy failed\n");
528 char eventReaderThreadName[20];
529 snprintf(eventReaderThreadName, sizeof(eventReaderThreadName), "EventRead%.2dThr", schedulerId);
530 sc = pthread_setname_np(readerThreadId, eventReaderThreadName);
532 perror("Setting name for event reader thread failed");
534 sc = pthread_attr_destroy(&attr);
536 mvLog(MVLOG_WARN, "Thread attr destroy failed");
538 xLinkEventPriv_t* event;
539 xLinkEventPriv_t response;
541 mvLog(MVLOG_INFO,"Scheduler thread started");
543 while (!curr->resetXLink) {
544 event = dispatcherGetNextEvent(curr);
549 ASSERT_X_LINK_R(event->packet.deviceHandle.xLinkFD == curr->deviceHandle.xLinkFD, NULL);
550 getRespFunction getResp;
551 xLinkEvent_t* toSend;
553 if (event->origin == EVENT_LOCAL){
554 getResp = glControlFunc->localGetResponse;
555 toSend = &event->packet;
557 getResp = glControlFunc->remoteGetResponse;
558 toSend = &response.packet;
561 res = getResp(&event->packet, &response.packet);
562 if (isEventTypeRequest(event)){
563 if (event->origin == EVENT_LOCAL){ //we need to do this for locals only
564 dispatcherRequestServe(event, curr);
566 // For PCIE and in with Connect to booted option don't send reset request
568 if (res == 0 && event->packet.header.flags.bitField.localServe == 0){
569 // FIXME We shouldn't send reset request for PCIE and with turned on "NO_BOOT" cmake option
570 // Also, we can't just close evenReader thread, as WinPthread don't have suitable function for this emergency exit,
571 // so, let's pretend that would be ping request, and then we can correctly close eventReader thread
573 if (toSend->header.type == XLINK_RESET_REQ) {
574 if(toSend->deviceHandle.protocol == X_LINK_PCIE) {
575 toSend->header.type = XLINK_PING_REQ;
576 curr->resetXLink = 1;
577 mvLog(MVLOG_INFO, "Request for reboot not sent");
580 toSend->header.type = XLINK_PING_REQ;
581 curr->resetXLink = 1;
582 mvLog(MVLOG_INFO, "Request for reboot not sent");
587 if (glControlFunc->eventSend(toSend) != 0) {
588 mvLog(MVLOG_ERROR, "Event sending failed");
592 if (event->origin == EVENT_REMOTE){ // match remote response with the local request
593 dispatcherResponseServe(event, curr);
597 //TODO: dispatcher shouldn't know about this packet. Seems to be easily move-able to protocol
598 if (event->packet.header.type == XLINK_RESET_REQ) {
599 curr->resetXLink = 1;
602 // remote event is served in one round
603 if (event->origin == EVENT_REMOTE){
604 event->isServed = EVENT_SERVED;
608 sc = pthread_join(readerThreadId, NULL);
610 mvLog(MVLOG_ERROR, "Waiting for thread failed");
613 if (dispatcherReset(curr) != 0) {
614 mvLog(MVLOG_WARN, "Failed to reset");
617 if (curr->resetXLink != 1) {
618 mvLog(MVLOG_ERROR,"Scheduler thread stopped");
620 mvLog(MVLOG_INFO,"Scheduler thread stopped");
626 static int createUniqueID()
632 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD)
635 if (xLinkFD == NULL) { //in case of myriad there should be one scheduler
636 if (numSchedulers == 1)
637 return &schedulerState[0];
641 for (i=0; i < MAX_SCHEDULERS; i++)
642 if (schedulerState[i].schedulerId != -1 &&
643 schedulerState[i].deviceHandle.xLinkFD == xLinkFD)
644 return &schedulerState[i];
648 ///////////////// External Interface //////////////////////////
649 /*Adds a new event with parameters and returns event id*/
650 xLinkEvent_t* dispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
652 xLinkSchedulerState_t* curr = findCorrespondingScheduler(event->deviceHandle.xLinkFD);
653 ASSERT_X_LINK_R(curr != NULL, NULL);
655 if(curr->resetXLink) {
659 mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
660 if (XLinkWaitSem(&curr->addEventSem)) {
661 mvLog(MVLOG_ERROR,"can't wait semaphore\n");
667 if (origin == EVENT_LOCAL) {
668 event->header.id = createUniqueID();
669 sem = getCurrentSem(pthread_self(), curr, 1);
671 sem = createSem(curr);
674 mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n");
675 if (sem_post(&curr->addEventSem)) {
676 mvLog(MVLOG_ERROR,"can't post semaphore\n");
680 event->header.flags.raw = 0;
681 event->header.flags.bitField.ack = 1;
682 ev = addNextQueueElemToProc(curr, &curr->lQueue, event, sem, origin);
684 ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
686 if (sem_post(&curr->addEventSem)) {
687 mvLog(MVLOG_ERROR,"can't post semaphore\n");
689 if (sem_post(&curr->notifyDispatcherSem)) {
690 mvLog(MVLOG_ERROR, "can't post semaphore\n");
695 int dispatcherWaitEventComplete(xLinkDeviceHandle_t* deviceHandle, unsigned int timeout)
697 xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
698 ASSERT_X_LINK(curr != NULL);
700 sem_t* id = getCurrentSem(pthread_self(), curr, 0);
705 int rc = XLinkWaitSemUserMode(id, timeout);
706 if (rc && deviceHandle->protocol != X_LINK_PCIE) {
707 xLinkEvent_t event = {0};
708 event.header.type = XLINK_RESET_REQ;
709 event.deviceHandle = *deviceHandle;
710 mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
711 dispatcherAddEvent(EVENT_LOCAL, &event);
712 id = getCurrentSem(pthread_self(), curr, 0);
713 if (id == NULL || XLinkWaitSemUserMode(id, timeout)) {
714 dispatcherReset(curr);
721 int dispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void* xLinkFD)
723 xLinkSchedulerState_t* curr = findCorrespondingScheduler(xLinkFD);
724 ASSERT_X_LINK(curr != NULL);
726 mvLog(MVLOG_DEBUG,"unblock\n");
727 xLinkEventPriv_t* blockedEvent;
728 for (blockedEvent = curr->lQueue.q;
729 blockedEvent < curr->lQueue.q + MAX_EVENTS;
732 if (blockedEvent->isServed == EVENT_BLOCKED &&
733 ((blockedEvent->packet.header.id == id || id == -1)
734 && blockedEvent->packet.header.type == type
735 && blockedEvent->packet.header.streamId == stream))
737 mvLog(MVLOG_DEBUG,"unblocked**************** %d %s\n",
738 (int)blockedEvent->packet.header.id,
739 TypeToStr((int)blockedEvent->packet.header.type));
740 markEventReady(blockedEvent);
743 mvLog(MVLOG_DEBUG,"%d %s\n",
744 (int)blockedEvent->packet.header.id,
745 TypeToStr((int)blockedEvent->packet.header.type));
751 int findAvailableScheduler()
754 for (i = 0; i < MAX_SCHEDULERS; i++)
755 if (schedulerState[i].schedulerId == -1)
761 * Initialize scheduler for device
763 int dispatcherStart(xLinkDeviceHandle_t* deviceHandle)
765 if (deviceHandle->xLinkFD == NULL) {
766 mvLog(MVLOG_ERROR, "Invalid device filedescriptor");
772 if (numSchedulers >= MAX_SCHEDULERS)
774 mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
778 int idx = findAvailableScheduler();
780 mvLog(MVLOG_ERROR,"Available sheduler not found");
784 memset(&schedulerState[idx], 0, sizeof(xLinkSchedulerState_t));
786 schedulerState[idx].semaphores = 0;
788 schedulerState[idx].resetXLink = 0;
789 schedulerState[idx].deviceHandle = *deviceHandle;
790 schedulerState[idx].schedulerId = idx;
792 schedulerState[idx].lQueue.cur = schedulerState[idx].lQueue.q;
793 schedulerState[idx].lQueue.curProc = schedulerState[idx].lQueue.q;
794 schedulerState[idx].lQueue.base = schedulerState[idx].lQueue.q;
795 schedulerState[idx].lQueue.end = &schedulerState[idx].lQueue.q[MAX_EVENTS];
797 schedulerState[idx].rQueue.cur = schedulerState[idx].rQueue.q;
798 schedulerState[idx].rQueue.curProc = schedulerState[idx].rQueue.q;
799 schedulerState[idx].rQueue.base = schedulerState[idx].rQueue.q;
800 schedulerState[idx].rQueue.end = &schedulerState[idx].rQueue.q[MAX_EVENTS];
802 for (eventIdx = 0 ; eventIdx < MAX_EVENTS; eventIdx++)
804 schedulerState[idx].rQueue.q[eventIdx].isServed = EVENT_SERVED;
805 schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED;
808 if (sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
809 perror("Can't create semaphore\n");
812 if (sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
813 perror("Can't create semaphore\n");
816 localSem_t* temp = schedulerState[idx].eventSemaphores;
817 while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) {
821 if (pthread_attr_init(&attr) != 0) {
822 mvLog(MVLOG_ERROR,"pthread_attr_init error");
826 XLinkWaitSem(&addSchedulerSem);
827 mvLog(MVLOG_DEBUG,"%s() starting a new thread - schedulerId %d \n", __func__, idx);
828 int sc = pthread_create(&schedulerState[idx].xLinkThreadId,
831 (void*)&schedulerState[idx].schedulerId);
833 mvLog(MVLOG_ERROR,"Thread creation failed with error: %d", sc);
834 if (pthread_attr_destroy(&attr) != 0) {
835 perror("Thread attr destroy failed\n");
840 char schedulerThreadName[20];
841 snprintf(schedulerThreadName, sizeof(schedulerThreadName), "Scheduler%.2dThr", schedulerState[idx].schedulerId);
842 sc = pthread_setname_np(schedulerState[idx].xLinkThreadId, schedulerThreadName);
844 perror("Setting name for indexed scheduler thread failed");
847 pthread_detach(schedulerState[idx].xLinkThreadId);
850 sc = pthread_attr_destroy(&attr);
852 perror("Thread attr destroy failed");
855 sem_post(&addSchedulerSem);
861 * @brief Initialize dispatcher functions and reset all schedulers
863 int dispatcherInitialize(struct dispatcherControlFunctions* controlFunc) {
866 !controlFunc->eventReceive ||
867 !controlFunc->eventSend ||
868 !controlFunc->localGetResponse ||
869 !controlFunc->remoteGetResponse)
874 glControlFunc = controlFunc;
875 if (sem_init(&addSchedulerSem, 0, 1)) {
876 perror("Can't create semaphore\n");
879 for (i = 0; i < MAX_SCHEDULERS; i++){
880 schedulerState[i].schedulerId = -1;
885 int dispatcherClean(void* xLinkFD)
887 xLinkSchedulerState_t* curr = findCorrespondingScheduler(xLinkFD);
888 ASSERT_X_LINK(curr != NULL);
890 CHECK_MUTEX_SUCCESS_RC(pthread_mutex_lock(&reset_mutex), 1);
891 if(!isAvailableScheduler(curr)) {
892 CHECK_MUTEX_SUCCESS(pthread_mutex_unlock(&reset_mutex));
895 mvLog(MVLOG_INFO, "Start Clean Dispatcher...");
896 closeDeviceFdAndResetScheduler(curr);
897 mvLog(MVLOG_INFO, "Clean Dispatcher Successfully...");
898 CHECK_MUTEX_SUCCESS(pthread_mutex_unlock(&reset_mutex));