1 // Copyright (C) 2018-2020 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
8 /// @brief Application configuration Leon header
11 #define _GNU_SOURCE // fix for warning: implicit declaration of function ‘pthread_setname_np’
21 #if (defined(_WIN32) || defined(_WIN64))
22 # include "win_pthread.h"
23 # include "win_semaphore.h"
27 # include <semaphore.h>
31 #include "XLinkDispatcher.h"
32 #include "XLinkMacros.h"
33 #include "XLinkPrivateDefines.h"
34 #include "XLinkPrivateFields.h"
36 #include "XLinkErrorUtils.h"
38 #define MVLOG_UNIT_NAME xLink
41 // ------------------------------------
42 // Data structures declaration. Begin.
43 // ------------------------------------
53 typedef struct xLinkEventPriv_t {
56 xLinkEventState_t isServed;
57 xLinkEventOrigin_t origin;
69 xLinkEventPriv_t* end;
70 xLinkEventPriv_t* base;
72 xLinkEventPriv_t* curProc;
73 xLinkEventPriv_t* cur;
74 XLINK_ALIGN_TO_BOUNDARY(64) xLinkEventPriv_t q[MAX_EVENTS];
78 * @brief Scheduler for each device
81 xLinkDeviceHandle_t deviceHandle; //will be device handler
84 int queueProcPriority;
87 sem_t notifyDispatcherSem;
88 volatile uint32_t resetXLink;
90 pthread_t xLinkThreadId;
92 eventQueueHandler_t lQueue; //local queue
93 eventQueueHandler_t rQueue; //remote queue
94 localSem_t eventSemaphores[MAXIMUM_SEMAPHORES];
95 } xLinkSchedulerState_t;
98 // ------------------------------------
99 // Data structures declaration. Begin.
100 // ------------------------------------
104 // ------------------------------------
105 // Global fields declaration. Begin.
106 // ------------------------------------
108 //These will be common for all, Initialized only once
109 DispatcherControlFunctions* glControlFunc;
111 xLinkSchedulerState_t schedulerState[MAX_SCHEDULERS];
112 sem_t addSchedulerSem;
114 static pthread_mutex_t clean_mutex = PTHREAD_MUTEX_INITIALIZER;
116 // ------------------------------------
117 // Global fields declaration. End.
118 // ------------------------------------
122 // ------------------------------------
123 // Helpers declaration. Begin.
124 // ------------------------------------
126 //below workaround for "C2088 '==': illegal for struct" error
127 static int pthread_t_compare(pthread_t a, pthread_t b);
129 static sem_t* createSem(xLinkSchedulerState_t* curr);
130 static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref);
131 static int unrefSem(sem_t* sem, xLinkSchedulerState_t* curr);
133 #if (defined(_WIN32) || defined(_WIN64))
134 static void* __cdecl eventReader(void* ctx);
135 static void* __cdecl eventSchedulerRun(void* ctx);
137 static void* eventReader(void* ctx);
138 static void* eventSchedulerRun(void* ctx);
141 static int isEventTypeRequest(xLinkEventPriv_t* event);
142 static void postAndMarkEventServed(xLinkEventPriv_t *event);
143 static int createUniqueID();
144 static int findAvailableScheduler();
145 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD);
147 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr);
148 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr);
150 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
151 xLinkEventPriv_t* start, xLinkEventState_t state);
153 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr);
155 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q );
156 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
157 eventQueueHandler_t *q, xLinkEvent_t* event,
158 sem_t* sem, xLinkEventOrigin_t o);
160 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr);
162 static int dispatcherClean(xLinkSchedulerState_t* curr);
163 static int dispatcherReset(xLinkSchedulerState_t* curr);
164 static void dispatcherFreeEvents(eventQueueHandler_t *queue, xLinkEventState_t state);
166 static XLinkError_t sendEvents(xLinkSchedulerState_t* curr);
168 // ------------------------------------
169 // Helpers declaration. End.
170 // ------------------------------------
174 // ------------------------------------
175 // XLinkDispatcher.h implementation. Begin.
176 // ------------------------------------
178 XLinkError_t DispatcherInitialize(DispatcherControlFunctions *controlFunc) {
179 ASSERT_XLINK(controlFunc != NULL);
181 if (!controlFunc->eventReceive ||
182 !controlFunc->eventSend ||
183 !controlFunc->localGetResponse ||
184 !controlFunc->remoteGetResponse) {
188 glControlFunc = controlFunc;
191 if (sem_init(&addSchedulerSem, 0, 1)) {
192 mvLog(MVLOG_ERROR, "Can't create semaphore\n");
196 for (int i = 0; i < MAX_SCHEDULERS; i++){
197 schedulerState[i].schedulerId = -1;
200 return X_LINK_SUCCESS;
203 XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle)
205 ASSERT_XLINK(deviceHandle);
207 ASSERT_XLINK(deviceHandle->xLinkFD != NULL);
212 if (numSchedulers >= MAX_SCHEDULERS)
214 mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
217 int idx = findAvailableScheduler();
219 mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
223 memset(&schedulerState[idx], 0, sizeof(xLinkSchedulerState_t));
225 schedulerState[idx].semaphores = 0;
226 schedulerState[idx].queueProcPriority = 0;
228 schedulerState[idx].resetXLink = 0;
229 schedulerState[idx].deviceHandle = *deviceHandle;
230 schedulerState[idx].schedulerId = idx;
232 schedulerState[idx].lQueue.cur = schedulerState[idx].lQueue.q;
233 schedulerState[idx].lQueue.curProc = schedulerState[idx].lQueue.q;
234 schedulerState[idx].lQueue.base = schedulerState[idx].lQueue.q;
235 schedulerState[idx].lQueue.end = &schedulerState[idx].lQueue.q[MAX_EVENTS];
237 schedulerState[idx].rQueue.cur = schedulerState[idx].rQueue.q;
238 schedulerState[idx].rQueue.curProc = schedulerState[idx].rQueue.q;
239 schedulerState[idx].rQueue.base = schedulerState[idx].rQueue.q;
240 schedulerState[idx].rQueue.end = &schedulerState[idx].rQueue.q[MAX_EVENTS];
242 for (eventIdx = 0 ; eventIdx < MAX_EVENTS; eventIdx++)
244 schedulerState[idx].rQueue.q[eventIdx].isServed = EVENT_SERVED;
245 schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED;
248 if (sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
249 perror("Can't create semaphore\n");
252 if (sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
253 perror("Can't create semaphore\n");
255 localSem_t* temp = schedulerState[idx].eventSemaphores;
256 while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) {
260 if (pthread_attr_init(&attr) != 0) {
261 mvLog(MVLOG_ERROR,"pthread_attr_init error");
266 if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
267 mvLog(MVLOG_ERROR,"pthread_attr_setinheritsched error");
268 pthread_attr_destroy(&attr);
270 if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
271 mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
272 pthread_attr_destroy(&attr);
276 sem_wait(&addSchedulerSem);
277 mvLog(MVLOG_DEBUG,"%s() starting a new thread - schedulerId %d \n", __func__, idx);
278 int sc = pthread_create(&schedulerState[idx].xLinkThreadId,
281 (void*)&schedulerState[idx].schedulerId);
283 mvLog(MVLOG_ERROR,"Thread creation failed with error: %d", sc);
284 if (pthread_attr_destroy(&attr) != 0) {
285 perror("Thread attr destroy failed\n");
291 char schedulerThreadName[MVLOG_MAXIMUM_THREAD_NAME_SIZE];
292 snprintf(schedulerThreadName, sizeof(schedulerThreadName), "Scheduler%.2dThr", schedulerState[idx].schedulerId);
293 sc = pthread_setname_np(schedulerState[idx].xLinkThreadId, schedulerThreadName);
295 perror("Setting name for indexed scheduler thread failed");
299 pthread_detach(schedulerState[idx].xLinkThreadId);
302 if (pthread_attr_destroy(&attr) != 0) {
303 mvLog(MVLOG_ERROR,"pthread_attr_destroy error");
306 sem_post(&addSchedulerSem);
311 int DispatcherClean(xLinkDeviceHandle_t *deviceHandle) {
312 XLINK_RET_IF(deviceHandle == NULL);
314 xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
315 XLINK_RET_IF(curr == NULL);
317 return dispatcherClean(curr);
320 xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
322 xLinkSchedulerState_t* curr = findCorrespondingScheduler(event->deviceHandle.xLinkFD);
323 XLINK_RET_ERR_IF(curr == NULL, NULL);
325 if(curr->resetXLink) {
328 mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
329 if (sem_wait(&curr->addEventSem)) {
330 mvLog(MVLOG_ERROR,"can't wait semaphore\n");
336 if (origin == EVENT_LOCAL) {
337 event->header.id = createUniqueID();
338 sem = getAndRefSem(pthread_self(), curr, 1);
340 sem = createSem(curr);
343 mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n");
344 if (sem_post(&curr->addEventSem)) {
345 mvLog(MVLOG_ERROR,"can't post semaphore\n");
350 event->header.flags.raw = 0;
351 ev = addNextQueueElemToProc(curr, &curr->lQueue, event, sem, origin);
353 ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
355 if (sem_post(&curr->addEventSem)) {
356 mvLog(MVLOG_ERROR,"can't post semaphore\n");
358 if (sem_post(&curr->notifyDispatcherSem)) {
359 mvLog(MVLOG_ERROR, "can't post semaphore\n");
364 int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle)
366 xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
367 ASSERT_XLINK(curr != NULL);
369 sem_t* id = getAndRefSem(pthread_self(), curr, 0);
374 int rc = sem_wait(id);
377 xLinkEvent_t event = {0};
378 event.header.type = XLINK_RESET_REQ;
379 event.deviceHandle = *deviceHandle;
380 mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
381 DispatcherAddEvent(EVENT_LOCAL, &event);
382 id = getAndRefSem(pthread_self(), curr, 0);
383 if (id == NULL || sem_wait(id)) {
384 dispatcherReset(curr);
389 if ((XLinkError_t)unrefSem(id, curr) == X_LINK_ERROR) {
390 mvLog(MVLOG_WARN, "Failed to unref sem");
396 char* TypeToStr(int type)
400 case XLINK_WRITE_REQ: return "XLINK_WRITE_REQ";
401 case XLINK_READ_REQ: return "XLINK_READ_REQ";
402 case XLINK_READ_REL_REQ: return "XLINK_READ_REL_REQ";
403 case XLINK_CREATE_STREAM_REQ:return "XLINK_CREATE_STREAM_REQ";
404 case XLINK_CLOSE_STREAM_REQ: return "XLINK_CLOSE_STREAM_REQ";
405 case XLINK_PING_REQ: return "XLINK_PING_REQ";
406 case XLINK_RESET_REQ: return "XLINK_RESET_REQ";
407 case XLINK_REQUEST_LAST: return "XLINK_REQUEST_LAST";
408 case XLINK_WRITE_RESP: return "XLINK_WRITE_RESP";
409 case XLINK_READ_RESP: return "XLINK_READ_RESP";
410 case XLINK_READ_REL_RESP: return "XLINK_READ_REL_RESP";
411 case XLINK_CREATE_STREAM_RESP: return "XLINK_CREATE_STREAM_RESP";
412 case XLINK_CLOSE_STREAM_RESP: return "XLINK_CLOSE_STREAM_RESP";
413 case XLINK_PING_RESP: return "XLINK_PING_RESP";
414 case XLINK_RESET_RESP: return "XLINK_RESET_RESP";
415 case XLINK_RESP_LAST: return "XLINK_RESP_LAST";
422 int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void *xlinkFD)
424 xLinkSchedulerState_t* curr = findCorrespondingScheduler(xlinkFD);
425 ASSERT_XLINK(curr != NULL);
427 mvLog(MVLOG_DEBUG,"unblock\n");
428 xLinkEventPriv_t* blockedEvent;
429 for (blockedEvent = curr->lQueue.q;
430 blockedEvent < curr->lQueue.q + MAX_EVENTS;
433 if (blockedEvent->isServed == EVENT_BLOCKED &&
434 ((blockedEvent->packet.header.id == id || id == -1)
435 && blockedEvent->packet.header.type == type
436 && blockedEvent->packet.header.streamId == stream))
438 mvLog(MVLOG_DEBUG,"unblocked**************** %d %s\n",
439 (int)blockedEvent->packet.header.id,
440 TypeToStr((int)blockedEvent->packet.header.type));
441 blockedEvent->isServed = EVENT_READY;
442 if (sem_post(&curr->notifyDispatcherSem)){
443 mvLog(MVLOG_ERROR, "can't post semaphore\n");
447 mvLog(MVLOG_DEBUG,"%d %s\n",
448 (int)blockedEvent->packet.header.id,
449 TypeToStr((int)blockedEvent->packet.header.type));
455 // ------------------------------------
456 // XLinkDispatcher.h implementation. End.
457 // ------------------------------------
461 // ------------------------------------
462 // Helpers implementation. Begin.
463 // ------------------------------------
465 int pthread_t_compare(pthread_t a, pthread_t b)
467 #if (defined(_WIN32) || defined(_WIN64) )
468 return ((a.tid == b.tid));
474 static sem_t* createSem(xLinkSchedulerState_t* curr)
476 XLINK_RET_ERR_IF(curr == NULL, NULL);
478 sem_t* sem = getAndRefSem(pthread_self(), curr, 0);
479 if (sem) {// it already exists, error
483 if (curr->semaphores <= MAXIMUM_SEMAPHORES) {
484 localSem_t* temp = curr->eventSemaphores;
486 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
487 if (temp->refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) {
488 if (curr->semaphores == MAXIMUM_SEMAPHORES && !temp->refs) {
489 XLINK_RET_ERR_IF(sem_destroy(&temp->sem) == -1, NULL);
492 #if (defined(_WIN32) || defined(_WIN64))
493 memset(&temp->threadId, 0, sizeof(temp->threadId));
499 if (temp->refs == -1) {
501 if (sem_init(sem, 0, 0)){
502 mvLog(MVLOG_ERROR, "Error: Can't create semaphore\n");
507 temp->threadId = pthread_self();
514 return NULL; //shouldn't happen
518 mvLog(MVLOG_ERROR, "Error: cached semaphores %d exceeds the MAXIMUM_SEMAPHORES %d", curr->semaphores, MAXIMUM_SEMAPHORES);
525 static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref)
527 XLINK_RET_ERR_IF(curr == NULL, NULL);
529 localSem_t* sem = curr->eventSemaphores;
530 while (sem < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
531 if (pthread_t_compare(sem->threadId, threadId) && sem->refs >= 0) {
532 sem->refs += inc_ref;
540 static int unrefSem(sem_t* sem, xLinkSchedulerState_t* curr) {
541 ASSERT_XLINK(curr != NULL);
542 localSem_t* temp = curr->eventSemaphores;
543 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
544 if (&temp->sem == sem) {
550 mvLog(MVLOG_WARN,"unrefSem : sem wasn't found\n");
554 #if (defined(_WIN32) || defined(_WIN64))
555 static void* __cdecl eventReader(void* ctx)
557 static void* eventReader(void* ctx)
560 xLinkSchedulerState_t *curr = (xLinkSchedulerState_t*)ctx;
561 XLINK_RET_ERR_IF(curr == NULL, NULL);
563 xLinkEvent_t event = { 0 };// to fix error C4700 in win
564 event.header.id = -1;
565 event.deviceHandle = curr->deviceHandle;
567 mvLog(MVLOG_INFO,"eventReader thread started");
569 while (!curr->resetXLink) {
570 int sc = glControlFunc->eventReceive(&event);
572 mvLog(MVLOG_DEBUG,"Reading %s (scheduler %d, fd %p, event id %d, event stream_id %u, event size %u)\n",
573 TypeToStr(event.header.type), curr->schedulerId, event.deviceHandle.xLinkFD, event.header.id, event.header.streamId, event.header.size);
576 mvLog(MVLOG_DEBUG,"Failed to receive event (err %d)", sc);
577 dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
578 dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
582 DispatcherAddEvent(EVENT_REMOTE, &event);
584 if (event.header.type == XLINK_RESET_REQ) {
585 curr->resetXLink = 1;
586 mvLog(MVLOG_DEBUG,"Read XLINK_RESET_REQ, stopping eventReader thread.");
593 #if (defined(_WIN32) || defined(_WIN64))
594 static void* __cdecl eventSchedulerRun(void* ctx)
596 static void* eventSchedulerRun(void* ctx)
599 int schedulerId = *((int*) ctx);
600 mvLog(MVLOG_DEBUG,"%s() schedulerId %d\n", __func__, schedulerId);
601 XLINK_RET_ERR_IF(schedulerId >= MAX_SCHEDULERS, NULL);
603 xLinkSchedulerState_t* curr = &schedulerState[schedulerId];
604 pthread_t readerThreadId; /* Create thread for reader.
605 This thread will notify the dispatcher of any incoming packets*/
608 if (pthread_attr_init(&attr) != 0) {
609 mvLog(MVLOG_ERROR,"pthread_attr_init error");
613 if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
614 pthread_attr_destroy(&attr);
615 mvLog(MVLOG_ERROR,"pthread_attr_setinheritsched error");
618 if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
619 pthread_attr_destroy(&attr);
620 mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
624 sc = pthread_create(&readerThreadId, &attr, eventReader, curr);
626 mvLog(MVLOG_ERROR, "Thread creation failed");
627 if (pthread_attr_destroy(&attr) != 0) {
628 perror("Thread attr destroy failed\n");
633 char eventReaderThreadName[MVLOG_MAXIMUM_THREAD_NAME_SIZE];
634 snprintf(eventReaderThreadName, sizeof(eventReaderThreadName), "EventRead%.2dThr", schedulerId);
635 sc = pthread_setname_np(readerThreadId, eventReaderThreadName);
637 perror("Setting name for event reader thread failed");
640 mvLog(MVLOG_INFO,"Scheduler thread started");
642 XLinkError_t rc = sendEvents(curr);
644 mvLog(MVLOG_ERROR, "sendEvents method finished with an error: %s", XLinkErrorToStr(rc));
647 sc = pthread_join(readerThreadId, NULL);
649 mvLog(MVLOG_ERROR, "Waiting for thread failed");
652 sc = pthread_attr_destroy(&attr);
654 mvLog(MVLOG_WARN, "Thread attr destroy failed");
657 if (dispatcherReset(curr) != 0) {
658 mvLog(MVLOG_WARN, "Failed to reset");
661 if (curr->resetXLink != 1) {
662 mvLog(MVLOG_ERROR,"Scheduler thread stopped");
664 mvLog(MVLOG_INFO,"Scheduler thread stopped");
670 static int isEventTypeRequest(xLinkEventPriv_t* event)
672 return event->packet.header.type < XLINK_REQUEST_LAST;
675 static void postAndMarkEventServed(xLinkEventPriv_t *event)
678 // the xLinkEventPriv_t slot pointed by "event" will be
679 // re-cycled as soon as we mark it as EVENT_SERVED,
680 // so before that, we copy the result event into XLink API layer
681 *(event->retEv) = event->packet;
684 if (sem_post(event->sem)) {
685 mvLog(MVLOG_ERROR,"can't post semaphore\n");
689 event->isServed = EVENT_SERVED;
692 static int createUniqueID()
698 int findAvailableScheduler()
701 for (i = 0; i < MAX_SCHEDULERS; i++)
702 if (schedulerState[i].schedulerId == -1)
707 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD)
710 if (xLinkFD == NULL) { //in case of myriad there should be one scheduler
711 if (numSchedulers == 1)
712 return &schedulerState[0];
716 for (i=0; i < MAX_SCHEDULERS; i++)
717 if (schedulerState[i].schedulerId != -1 &&
718 schedulerState[i].deviceHandle.xLinkFD == xLinkFD)
719 return &schedulerState[i];
724 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr){
725 XLINK_RET_IF(curr == NULL);
726 XLINK_RET_IF(!isEventTypeRequest(event));
727 xLinkEventHeader_t *header = &event->packet.header;
728 if (header->flags.bitField.block){ //block is requested
729 event->isServed = EVENT_BLOCKED;
730 } else if(header->flags.bitField.localServe == 1 ||
731 (header->flags.bitField.ack == 0
732 && header->flags.bitField.nack == 1)){ //this event is served locally, or it is failed
733 postAndMarkEventServed(event);
734 }else if (header->flags.bitField.ack == 1
735 && header->flags.bitField.nack == 0){
736 event->isServed = EVENT_PENDING;
737 mvLog(MVLOG_DEBUG,"------------------------UNserved %s\n",
738 TypeToStr(event->packet.header.type));
745 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr)
747 XLINK_RET_ERR_IF(curr == NULL, 1);
748 XLINK_RET_ERR_IF(isEventTypeRequest(event), 1);
750 for (i = 0; i < MAX_EVENTS; i++)
752 xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
753 xLinkEventHeader_t *evHeader = &event->packet.header;
755 if (curr->lQueue.q[i].isServed == EVENT_PENDING &&
756 header->id == evHeader->id &&
757 header->type == evHeader->type - XLINK_REQUEST_LAST -1)
759 mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n",
760 TypeToStr(header->type));
761 //propagate back flags
762 header->flags = evHeader->flags;
763 postAndMarkEventServed(&curr->lQueue.q[i]);
767 if (i == MAX_EVENTS) {
768 mvLog(MVLOG_FATAL,"no request for this response: %s %d\n", TypeToStr(event->packet.header.type), event->origin);
769 mvLog(MVLOG_DEBUG,"#### (i == MAX_EVENTS) %s %d %d\n", TypeToStr(event->packet.header.type), event->origin, (int)event->packet.header.id);
770 for (i = 0; i < MAX_EVENTS; i++)
772 xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
774 mvLog(MVLOG_DEBUG,"%d) header->id %i, header->type %s(%i), curr->lQueue.q[i].isServed %i, EVENT_PENDING %i\n", i, (int)header->id
775 , TypeToStr(header->type), header->type, curr->lQueue.q[i].isServed, EVENT_PENDING);
783 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
784 xLinkEventPriv_t* start, xLinkEventState_t state){
785 xLinkEventPriv_t* tmp = start;
786 while (start->isServed != state){
787 CIRCULAR_INCREMENT_BASE(start, end, base);
792 if(start->isServed == state){
799 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr)
801 XLINK_RET_ERR_IF(curr == NULL, NULL);
802 xLinkEventPriv_t* ev = NULL;
804 ev = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_READY);
806 mvLog(MVLOG_DEBUG,"ready %s %d \n",
807 TypeToStr((int)ev->packet.header.type),
808 (int)ev->packet.header.id);
813 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){
814 xLinkEventPriv_t* event = NULL;
815 if (q->cur != q->curProc) {
816 event = getNextElementWithState(q->base, q->end, q->curProc, EVENT_ALLOCATED);
818 CIRCULAR_INCREMENT_BASE(q->curProc, q->end, q->base);
824 * @brief Add event to Queue
825 * @note It called from dispatcherAddEvent
827 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
828 eventQueueHandler_t *q, xLinkEvent_t* event,
829 sem_t* sem, xLinkEventOrigin_t o){
831 xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
832 if (eventP == NULL) {
833 mvLog(MVLOG_ERROR, "getNextElementWithState returned NULL");
836 mvLog(MVLOG_DEBUG, "Received event %s %d", TypeToStr(event->header.type), o);
837 ev = &eventP->packet;
841 eventP->packet = *event;
843 if (o == EVENT_LOCAL) {
844 // XLink API caller provided buffer for return the final result to
845 eventP->retEv = event;
847 eventP->retEv = NULL;
850 eventP->isServed = EVENT_ALLOCATED;
851 CIRCULAR_INCREMENT_BASE(q->cur, q->end, q->base);
855 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
857 XLINK_RET_ERR_IF(curr == NULL, NULL);
859 if (sem_wait(&curr->notifyDispatcherSem)) {
860 mvLog(MVLOG_ERROR,"can't post semaphore\n");
863 xLinkEventPriv_t* event = NULL;
864 event = searchForReadyEvent(curr);
869 eventQueueHandler_t* hPriorityQueue = curr->queueProcPriority ? &curr->lQueue : &curr->rQueue;
870 eventQueueHandler_t* lPriorityQueue = curr->queueProcPriority ? &curr->rQueue : &curr->lQueue;
871 curr->queueProcPriority = curr->queueProcPriority ? 0 : 1;
873 event = getNextQueueElemToProc(hPriorityQueue);
877 event = getNextQueueElemToProc(lPriorityQueue);
882 static int dispatcherClean(xLinkSchedulerState_t* curr)
884 XLINK_RET_ERR_IF(pthread_mutex_lock(&clean_mutex), 1);
885 if (curr->schedulerId == -1) {
886 mvLog(MVLOG_WARN,"Scheduler has already been reset or cleaned");
887 if(pthread_mutex_unlock(&clean_mutex) != 0) {
888 mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex");
894 mvLog(MVLOG_INFO, "Start Clean Dispatcher...");
896 if (sem_post(&curr->notifyDispatcherSem)) {
897 mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event
899 xLinkEventPriv_t* event = dispatcherGetNextEvent(curr);
900 while (event != NULL) {
901 mvLog(MVLOG_INFO, "dropped event is %s, status %d\n",
902 TypeToStr(event->packet.header.type), event->isServed);
904 postAndMarkEventServed(event);
905 event = dispatcherGetNextEvent(curr);
908 dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
909 dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
911 curr->schedulerId = -1;
912 curr->resetXLink = 1;
913 sem_destroy(&curr->addEventSem);
914 sem_destroy(&curr->notifyDispatcherSem);
915 localSem_t* temp = curr->eventSemaphores;
916 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
917 // unblock potentially blocked event semaphores
918 sem_post(&temp->sem);
919 sem_destroy(&temp->sem);
925 mvLog(MVLOG_INFO, "Clean Dispatcher Successfully...");
926 if(pthread_mutex_unlock(&clean_mutex) != 0) {
927 mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex after clearing dispatcher");
932 static int dispatcherReset(xLinkSchedulerState_t* curr)
934 ASSERT_XLINK(curr != NULL);
936 glControlFunc->closeDeviceFd(&curr->deviceHandle);
937 if(dispatcherClean(curr)) {
938 mvLog(MVLOG_INFO, "Failed to clean dispatcher");
941 xLinkDesc_t* link = getLink(curr->deviceHandle.xLinkFD);
942 if(link == NULL || sem_post(&link->dispatcherClosedSem)) {
943 mvLog(MVLOG_DEBUG,"can't post dispatcherClosedSem\n");
946 glControlFunc->closeLink(curr->deviceHandle.xLinkFD, 1);
947 mvLog(MVLOG_DEBUG,"Reset Successfully\n");
951 static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
953 xLinkEventPriv_t* event;
954 xLinkEventPriv_t response;
956 while (!curr->resetXLink) {
957 event = dispatcherGetNextEvent(curr);
959 mvLog(MVLOG_ERROR,"Dispatcher received NULL event!");
961 break; //Mean that user reset XLink.
967 if(event->packet.deviceHandle.xLinkFD
968 != curr->deviceHandle.xLinkFD) {
969 mvLog(MVLOG_FATAL,"The file descriptor mismatch between the event and the scheduler.\n"
970 " Event: id=%d, fd=%p"
972 event->packet.header.id, event->packet.deviceHandle.xLinkFD,
973 curr->deviceHandle.xLinkFD);
974 event->packet.header.flags.bitField.nack = 1;
975 event->packet.header.flags.bitField.ack = 0;
977 if (event->origin == EVENT_LOCAL){
978 dispatcherRequestServe(event, curr);
980 dispatcherResponseServe(event, curr);
986 getRespFunction getResp;
987 xLinkEvent_t* toSend;
988 if (event->origin == EVENT_LOCAL){
989 getResp = glControlFunc->localGetResponse;
990 toSend = &event->packet;
992 getResp = glControlFunc->remoteGetResponse;
993 toSend = &response.packet;
996 res = getResp(&event->packet, &response.packet);
997 if (isEventTypeRequest(event)){
998 if (event->origin == EVENT_LOCAL){ //we need to do this for locals only
999 if(dispatcherRequestServe(event, curr)) {
1000 mvLog(MVLOG_ERROR, "Failed to serve local event. "
1001 "Event: id=%d, type=%s, streamId=%u, streamName=%s",
1002 event->packet.header.id, TypeToStr(event->packet.header.type),
1003 event->packet.header.streamId, event->packet.header.streamName);
1007 if (res == 0 && event->packet.header.flags.bitField.localServe == 0) {
1009 if (toSend->header.type == XLINK_RESET_REQ) {
1010 curr->resetXLink = 1;
1011 mvLog(MVLOG_DEBUG,"Send XLINK_RESET_REQ, stopping sendEvents thread.");
1012 if(toSend->deviceHandle.protocol == X_LINK_PCIE) {
1013 toSend->header.type = XLINK_PING_REQ;
1014 mvLog(MVLOG_DEBUG, "Request for reboot not sent, only ping event");
1016 #if defined(NO_BOOT)
1017 toSend->header.type = XLINK_PING_REQ;
1018 mvLog(MVLOG_INFO, "Request for reboot not sent, only ping event");
1019 #endif // defined(NO_BOOT)
1024 if (glControlFunc->eventSend(toSend) != 0) {
1025 dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
1026 dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
1027 mvLog(MVLOG_ERROR, "Event sending failed");
1031 if (event->origin == EVENT_REMOTE){ // match remote response with the local request
1032 dispatcherResponseServe(event, curr);
1036 if (event->origin == EVENT_REMOTE){
1037 event->isServed = EVENT_SERVED;
1041 return X_LINK_SUCCESS;
1044 static void dispatcherFreeEvents(eventQueueHandler_t *queue, xLinkEventState_t state) {
1049 xLinkEventPriv_t* event = getNextElementWithState(queue->base, queue->end, queue->base, state);
1050 while (event != NULL) {
1051 mvLog(MVLOG_DEBUG, "Event is %s, size is %d, Mark it served\n", TypeToStr(event->packet.header.type), event->packet.header.size);
1052 postAndMarkEventServed(event);
1053 event = getNextElementWithState(queue->base, queue->end, queue->base, state);
1058 // ------------------------------------
1059 // Helpers implementation. End.
1060 // ------------------------------------