1 // Copyright (C) 2018-2020 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
8 /// @brief Application configuration Leon header
11 #define _GNU_SOURCE // fix for warning: implicit declaration of function ‘pthread_setname_np’
21 #include "XLinkDispatcher.h"
22 #include "XLinkMacros.h"
23 #include "XLinkPrivateDefines.h"
24 #include "XLinkPrivateFields.h"
26 #include "XLinkErrorUtils.h"
28 #define MVLOG_UNIT_NAME xLink
31 // ------------------------------------
32 // Data structures declaration. Begin.
33 // ------------------------------------
43 typedef struct xLinkEventPriv_t {
46 xLinkEventState_t isServed;
47 xLinkEventOrigin_t origin;
58 xLinkEventPriv_t* end;
59 xLinkEventPriv_t* base;
61 xLinkEventPriv_t* curProc;
62 xLinkEventPriv_t* cur;
63 XLINK_ALIGN_TO_BOUNDARY(64) xLinkEventPriv_t q[MAX_EVENTS];
67 * @brief Scheduler for each device
70 xLinkDeviceHandle_t deviceHandle; //will be device handler
73 int queueProcPriority;
75 XLink_sem_t addEventSem;
76 XLink_sem_t notifyDispatcherSem;
77 volatile uint32_t resetXLink;
79 pthread_t xLinkThreadId;
81 eventQueueHandler_t lQueue; //local queue
82 eventQueueHandler_t rQueue; //remote queue
83 localSem_t eventSemaphores[MAXIMUM_SEMAPHORES];
84 } xLinkSchedulerState_t;
87 // ------------------------------------
88 // Data structures declaration. Begin.
89 // ------------------------------------
93 // ------------------------------------
94 // Global fields declaration. Begin.
95 // ------------------------------------
97 //These will be common for all, Initialized only once
98 DispatcherControlFunctions* glControlFunc;
100 xLinkSchedulerState_t schedulerState[MAX_SCHEDULERS];
101 sem_t addSchedulerSem;
103 static pthread_mutex_t clean_mutex = PTHREAD_MUTEX_INITIALIZER;
105 // ------------------------------------
106 // Global fields declaration. End.
107 // ------------------------------------
111 // ------------------------------------
112 // Helpers declaration. Begin.
113 // ------------------------------------
115 //below workaround for "C2088 '==': illegal for struct" error
116 static int pthread_t_compare(pthread_t a, pthread_t b);
118 static XLink_sem_t* createSem(xLinkSchedulerState_t* curr);
119 static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr);
121 #if (defined(_WIN32) || defined(_WIN64))
122 static void* __cdecl eventReader(void* ctx);
123 static void* __cdecl eventSchedulerRun(void* ctx);
125 static void* eventReader(void* ctx);
126 static void* eventSchedulerRun(void* ctx);
129 static int isEventTypeRequest(xLinkEventPriv_t* event);
130 static void postAndMarkEventServed(xLinkEventPriv_t *event);
131 static int createUniqueID();
132 static int findAvailableScheduler();
133 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD);
135 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr);
136 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr);
138 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
139 xLinkEventPriv_t* start, xLinkEventState_t state);
141 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr);
143 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q );
144 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
145 eventQueueHandler_t *q, xLinkEvent_t* event,
146 XLink_sem_t* sem, xLinkEventOrigin_t o);
148 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr);
150 static int dispatcherClean(xLinkSchedulerState_t* curr);
151 static int dispatcherReset(xLinkSchedulerState_t* curr);
152 static void dispatcherFreeEvents(eventQueueHandler_t *queue, xLinkEventState_t state);
154 static XLinkError_t sendEvents(xLinkSchedulerState_t* curr);
156 // ------------------------------------
157 // Helpers declaration. End.
158 // ------------------------------------
162 // ------------------------------------
163 // XLinkDispatcher.h implementation. Begin.
164 // ------------------------------------
166 XLinkError_t DispatcherInitialize(DispatcherControlFunctions *controlFunc) {
167 ASSERT_XLINK(controlFunc != NULL);
169 if (!controlFunc->eventReceive ||
170 !controlFunc->eventSend ||
171 !controlFunc->localGetResponse ||
172 !controlFunc->remoteGetResponse) {
176 glControlFunc = controlFunc;
179 if (sem_init(&addSchedulerSem, 0, 1)) {
180 mvLog(MVLOG_ERROR, "Can't create semaphore\n");
184 for (int i = 0; i < MAX_SCHEDULERS; i++){
185 schedulerState[i].schedulerId = -1;
188 return X_LINK_SUCCESS;
191 XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle)
193 ASSERT_XLINK(deviceHandle);
195 ASSERT_XLINK(deviceHandle->xLinkFD != NULL);
200 if (numSchedulers >= MAX_SCHEDULERS)
202 mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
205 int idx = findAvailableScheduler();
207 mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
211 memset(&schedulerState[idx], 0, sizeof(xLinkSchedulerState_t));
213 schedulerState[idx].semaphores = 0;
214 schedulerState[idx].queueProcPriority = 0;
216 schedulerState[idx].resetXLink = 0;
217 schedulerState[idx].deviceHandle = *deviceHandle;
218 schedulerState[idx].schedulerId = idx;
220 schedulerState[idx].lQueue.cur = schedulerState[idx].lQueue.q;
221 schedulerState[idx].lQueue.curProc = schedulerState[idx].lQueue.q;
222 schedulerState[idx].lQueue.base = schedulerState[idx].lQueue.q;
223 schedulerState[idx].lQueue.end = &schedulerState[idx].lQueue.q[MAX_EVENTS];
225 schedulerState[idx].rQueue.cur = schedulerState[idx].rQueue.q;
226 schedulerState[idx].rQueue.curProc = schedulerState[idx].rQueue.q;
227 schedulerState[idx].rQueue.base = schedulerState[idx].rQueue.q;
228 schedulerState[idx].rQueue.end = &schedulerState[idx].rQueue.q[MAX_EVENTS];
230 for (eventIdx = 0 ; eventIdx < MAX_EVENTS; eventIdx++)
232 schedulerState[idx].rQueue.q[eventIdx].isServed = EVENT_SERVED;
233 schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED;
236 if (XLink_sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
237 perror("Can't create semaphore\n");
240 if (XLink_sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
241 perror("Can't create semaphore\n");
243 localSem_t* temp = schedulerState[idx].eventSemaphores;
244 while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) {
245 XLink_sem_set_refs(&temp->sem, -1);
248 if (pthread_attr_init(&attr) != 0) {
249 mvLog(MVLOG_ERROR,"pthread_attr_init error");
254 if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
255 mvLog(MVLOG_ERROR,"pthread_attr_setinheritsched error");
256 pthread_attr_destroy(&attr);
258 if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
259 mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
260 pthread_attr_destroy(&attr);
264 sem_wait(&addSchedulerSem);
265 mvLog(MVLOG_DEBUG,"%s() starting a new thread - schedulerId %d \n", __func__, idx);
266 int sc = pthread_create(&schedulerState[idx].xLinkThreadId,
269 (void*)&schedulerState[idx].schedulerId);
271 mvLog(MVLOG_ERROR,"Thread creation failed with error: %d", sc);
272 if (pthread_attr_destroy(&attr) != 0) {
273 perror("Thread attr destroy failed\n");
279 char schedulerThreadName[MVLOG_MAXIMUM_THREAD_NAME_SIZE];
280 snprintf(schedulerThreadName, sizeof(schedulerThreadName), "Scheduler%.2dThr", schedulerState[idx].schedulerId);
281 sc = pthread_setname_np(schedulerState[idx].xLinkThreadId, schedulerThreadName);
283 perror("Setting name for indexed scheduler thread failed");
287 pthread_detach(schedulerState[idx].xLinkThreadId);
290 if (pthread_attr_destroy(&attr) != 0) {
291 mvLog(MVLOG_ERROR,"pthread_attr_destroy error");
294 sem_post(&addSchedulerSem);
299 int DispatcherClean(xLinkDeviceHandle_t *deviceHandle) {
300 XLINK_RET_IF(deviceHandle == NULL);
302 xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
303 XLINK_RET_IF(curr == NULL);
305 return dispatcherClean(curr);
308 xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
310 xLinkSchedulerState_t* curr = findCorrespondingScheduler(event->deviceHandle.xLinkFD);
311 XLINK_RET_ERR_IF(curr == NULL, NULL);
313 if(curr->resetXLink) {
316 mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
317 if (XLink_sem_wait(&curr->addEventSem)) {
318 mvLog(MVLOG_ERROR,"can't wait semaphore\n");
322 XLink_sem_t *sem = NULL;
324 if (origin == EVENT_LOCAL) {
325 event->header.id = createUniqueID();
326 sem = getSem(pthread_self(), curr);
328 sem = createSem(curr);
331 mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n");
332 if (XLink_sem_post(&curr->addEventSem)) {
333 mvLog(MVLOG_ERROR,"can't post semaphore\n");
338 event->header.flags.raw = 0;
339 ev = addNextQueueElemToProc(curr, &curr->lQueue, event, sem, origin);
341 ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
343 if (XLink_sem_post(&curr->addEventSem)) {
344 mvLog(MVLOG_ERROR,"can't post semaphore\n");
346 if (XLink_sem_post(&curr->notifyDispatcherSem)) {
347 mvLog(MVLOG_ERROR, "can't post semaphore\n");
352 int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle)
354 xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
355 ASSERT_XLINK(curr != NULL);
357 XLink_sem_t* id = getSem(pthread_self(), curr);
362 int rc = XLink_sem_wait(id);
365 xLinkEvent_t event = {0};
366 event.header.type = XLINK_RESET_REQ;
367 event.deviceHandle = *deviceHandle;
368 mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
369 DispatcherAddEvent(EVENT_LOCAL, &event);
370 id = getSem(pthread_self(), curr);
371 if (id == NULL || XLink_sem_wait(id)) {
372 dispatcherReset(curr);
380 char* TypeToStr(int type)
384 case XLINK_WRITE_REQ: return "XLINK_WRITE_REQ";
385 case XLINK_READ_REQ: return "XLINK_READ_REQ";
386 case XLINK_READ_REL_REQ: return "XLINK_READ_REL_REQ";
387 case XLINK_CREATE_STREAM_REQ:return "XLINK_CREATE_STREAM_REQ";
388 case XLINK_CLOSE_STREAM_REQ: return "XLINK_CLOSE_STREAM_REQ";
389 case XLINK_PING_REQ: return "XLINK_PING_REQ";
390 case XLINK_RESET_REQ: return "XLINK_RESET_REQ";
391 case XLINK_REQUEST_LAST: return "XLINK_REQUEST_LAST";
392 case XLINK_WRITE_RESP: return "XLINK_WRITE_RESP";
393 case XLINK_READ_RESP: return "XLINK_READ_RESP";
394 case XLINK_READ_REL_RESP: return "XLINK_READ_REL_RESP";
395 case XLINK_CREATE_STREAM_RESP: return "XLINK_CREATE_STREAM_RESP";
396 case XLINK_CLOSE_STREAM_RESP: return "XLINK_CLOSE_STREAM_RESP";
397 case XLINK_PING_RESP: return "XLINK_PING_RESP";
398 case XLINK_RESET_RESP: return "XLINK_RESET_RESP";
399 case XLINK_RESP_LAST: return "XLINK_RESP_LAST";
406 int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void *xlinkFD)
408 xLinkSchedulerState_t* curr = findCorrespondingScheduler(xlinkFD);
409 ASSERT_XLINK(curr != NULL);
411 mvLog(MVLOG_DEBUG,"unblock\n");
412 xLinkEventPriv_t* blockedEvent;
413 for (blockedEvent = curr->lQueue.q;
414 blockedEvent < curr->lQueue.q + MAX_EVENTS;
417 if (blockedEvent->isServed == EVENT_BLOCKED &&
418 ((blockedEvent->packet.header.id == id || id == -1)
419 && blockedEvent->packet.header.type == type
420 && blockedEvent->packet.header.streamId == stream))
422 mvLog(MVLOG_DEBUG,"unblocked**************** %d %s\n",
423 (int)blockedEvent->packet.header.id,
424 TypeToStr((int)blockedEvent->packet.header.type));
425 blockedEvent->isServed = EVENT_READY;
426 if (XLink_sem_post(&curr->notifyDispatcherSem)){
427 mvLog(MVLOG_ERROR, "can't post semaphore\n");
431 mvLog(MVLOG_DEBUG,"%d %s\n",
432 (int)blockedEvent->packet.header.id,
433 TypeToStr((int)blockedEvent->packet.header.type));
439 // ------------------------------------
440 // XLinkDispatcher.h implementation. End.
441 // ------------------------------------
445 // ------------------------------------
446 // Helpers implementation. Begin.
447 // ------------------------------------
449 int pthread_t_compare(pthread_t a, pthread_t b)
451 #if (defined(_WIN32) || defined(_WIN64) )
452 return ((a.tid == b.tid));
458 static XLink_sem_t* createSem(xLinkSchedulerState_t* curr)
460 XLINK_RET_ERR_IF(curr == NULL, NULL);
462 XLink_sem_t* sem = getSem(pthread_self(), curr);
463 if (sem) {// it already exists, error
467 if (curr->semaphores <= MAXIMUM_SEMAPHORES) {
468 localSem_t* temp = curr->eventSemaphores;
470 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
472 XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
473 if (refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) {
474 if (curr->semaphores == MAXIMUM_SEMAPHORES && refs == 0) {
475 XLINK_RET_ERR_IF(XLink_sem_destroy(&temp->sem), NULL);
476 XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
478 #if (defined(_WIN32) || defined(_WIN64))
479 memset(&temp->threadId, 0, sizeof(temp->threadId));
487 if (XLink_sem_init(sem, 0, 0)){
488 mvLog(MVLOG_ERROR, "Error: Can't create semaphore\n");
492 temp->threadId = pthread_self();
499 return NULL; //shouldn't happen
503 mvLog(MVLOG_ERROR, "Error: cached semaphores %d exceeds the MAXIMUM_SEMAPHORES %d", curr->semaphores, MAXIMUM_SEMAPHORES);
510 static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr)
512 XLINK_RET_ERR_IF(curr == NULL, NULL);
514 localSem_t* temp = curr->eventSemaphores;
515 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
517 XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
518 if (pthread_t_compare(temp->threadId, threadId) && refs >= 0) {
526 #if (defined(_WIN32) || defined(_WIN64))
527 static void* __cdecl eventReader(void* ctx)
529 static void* eventReader(void* ctx)
532 xLinkSchedulerState_t *curr = (xLinkSchedulerState_t*)ctx;
533 XLINK_RET_ERR_IF(curr == NULL, NULL);
535 xLinkEvent_t event = { 0 };// to fix error C4700 in win
536 event.header.id = -1;
537 event.deviceHandle = curr->deviceHandle;
539 mvLog(MVLOG_INFO,"eventReader thread started");
541 while (!curr->resetXLink) {
542 int sc = glControlFunc->eventReceive(&event);
544 mvLog(MVLOG_DEBUG,"Reading %s (scheduler %d, fd %p, event id %d, event stream_id %u, event size %u)\n",
545 TypeToStr(event.header.type), curr->schedulerId, event.deviceHandle.xLinkFD, event.header.id, event.header.streamId, event.header.size);
548 mvLog(MVLOG_DEBUG,"Failed to receive event (err %d)", sc);
549 dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
550 dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
554 DispatcherAddEvent(EVENT_REMOTE, &event);
556 if (event.header.type == XLINK_RESET_REQ) {
557 curr->resetXLink = 1;
558 mvLog(MVLOG_DEBUG,"Read XLINK_RESET_REQ, stopping eventReader thread.");
565 #if (defined(_WIN32) || defined(_WIN64))
566 static void* __cdecl eventSchedulerRun(void* ctx)
568 static void* eventSchedulerRun(void* ctx)
571 int schedulerId = *((int*) ctx);
572 mvLog(MVLOG_DEBUG,"%s() schedulerId %d\n", __func__, schedulerId);
573 XLINK_RET_ERR_IF(schedulerId >= MAX_SCHEDULERS, NULL);
575 xLinkSchedulerState_t* curr = &schedulerState[schedulerId];
576 pthread_t readerThreadId; /* Create thread for reader.
577 This thread will notify the dispatcher of any incoming packets*/
580 if (pthread_attr_init(&attr) != 0) {
581 mvLog(MVLOG_ERROR,"pthread_attr_init error");
585 if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
586 pthread_attr_destroy(&attr);
587 mvLog(MVLOG_ERROR,"pthread_attr_setinheritsched error");
590 if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
591 pthread_attr_destroy(&attr);
592 mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
596 sc = pthread_create(&readerThreadId, &attr, eventReader, curr);
598 mvLog(MVLOG_ERROR, "Thread creation failed");
599 if (pthread_attr_destroy(&attr) != 0) {
600 perror("Thread attr destroy failed\n");
605 char eventReaderThreadName[MVLOG_MAXIMUM_THREAD_NAME_SIZE];
606 snprintf(eventReaderThreadName, sizeof(eventReaderThreadName), "EventRead%.2dThr", schedulerId);
607 sc = pthread_setname_np(readerThreadId, eventReaderThreadName);
609 perror("Setting name for event reader thread failed");
612 mvLog(MVLOG_INFO,"Scheduler thread started");
614 XLinkError_t rc = sendEvents(curr);
616 mvLog(MVLOG_ERROR, "sendEvents method finished with an error: %s", XLinkErrorToStr(rc));
619 sc = pthread_join(readerThreadId, NULL);
621 mvLog(MVLOG_ERROR, "Waiting for thread failed");
624 sc = pthread_attr_destroy(&attr);
626 mvLog(MVLOG_WARN, "Thread attr destroy failed");
629 if (dispatcherReset(curr) != 0) {
630 mvLog(MVLOG_WARN, "Failed to reset");
633 if (curr->resetXLink != 1) {
634 mvLog(MVLOG_ERROR,"Scheduler thread stopped");
636 mvLog(MVLOG_INFO,"Scheduler thread stopped");
642 static int isEventTypeRequest(xLinkEventPriv_t* event)
644 return event->packet.header.type < XLINK_REQUEST_LAST;
647 static void postAndMarkEventServed(xLinkEventPriv_t *event)
650 // the xLinkEventPriv_t slot pointed by "event" will be
651 // re-cycled as soon as we mark it as EVENT_SERVED,
652 // so before that, we copy the result event into XLink API layer
653 *(event->retEv) = event->packet;
656 if (XLink_sem_post(event->sem)) {
657 mvLog(MVLOG_ERROR,"can't post semaphore\n");
661 event->isServed = EVENT_SERVED;
664 static int createUniqueID()
670 int findAvailableScheduler()
673 for (i = 0; i < MAX_SCHEDULERS; i++)
674 if (schedulerState[i].schedulerId == -1)
679 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD)
682 if (xLinkFD == NULL) { //in case of myriad there should be one scheduler
683 if (numSchedulers == 1)
684 return &schedulerState[0];
688 for (i=0; i < MAX_SCHEDULERS; i++)
689 if (schedulerState[i].schedulerId != -1 &&
690 schedulerState[i].deviceHandle.xLinkFD == xLinkFD)
691 return &schedulerState[i];
696 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr){
697 XLINK_RET_IF(curr == NULL);
698 XLINK_RET_IF(!isEventTypeRequest(event));
699 xLinkEventHeader_t *header = &event->packet.header;
700 if (header->flags.bitField.block){ //block is requested
701 event->isServed = EVENT_BLOCKED;
702 } else if(header->flags.bitField.localServe == 1 ||
703 (header->flags.bitField.ack == 0
704 && header->flags.bitField.nack == 1)){ //this event is served locally, or it is failed
705 postAndMarkEventServed(event);
706 }else if (header->flags.bitField.ack == 1
707 && header->flags.bitField.nack == 0){
708 event->isServed = EVENT_PENDING;
709 mvLog(MVLOG_DEBUG,"------------------------UNserved %s\n",
710 TypeToStr(event->packet.header.type));
717 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr)
719 XLINK_RET_ERR_IF(curr == NULL, 1);
720 XLINK_RET_ERR_IF(isEventTypeRequest(event), 1);
722 for (i = 0; i < MAX_EVENTS; i++)
724 xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
725 xLinkEventHeader_t *evHeader = &event->packet.header;
727 if (curr->lQueue.q[i].isServed == EVENT_PENDING &&
728 header->id == evHeader->id &&
729 header->type == evHeader->type - XLINK_REQUEST_LAST -1)
731 mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n",
732 TypeToStr(header->type));
733 //propagate back flags
734 header->flags = evHeader->flags;
735 postAndMarkEventServed(&curr->lQueue.q[i]);
739 if (i == MAX_EVENTS) {
740 mvLog(MVLOG_FATAL,"no request for this response: %s %d\n", TypeToStr(event->packet.header.type), event->origin);
741 mvLog(MVLOG_DEBUG,"#### (i == MAX_EVENTS) %s %d %d\n", TypeToStr(event->packet.header.type), event->origin, (int)event->packet.header.id);
742 for (i = 0; i < MAX_EVENTS; i++)
744 xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
746 mvLog(MVLOG_DEBUG,"%d) header->id %i, header->type %s(%i), curr->lQueue.q[i].isServed %i, EVENT_PENDING %i\n", i, (int)header->id
747 , TypeToStr(header->type), header->type, curr->lQueue.q[i].isServed, EVENT_PENDING);
755 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
756 xLinkEventPriv_t* start, xLinkEventState_t state){
757 xLinkEventPriv_t* tmp = start;
758 while (start->isServed != state){
759 CIRCULAR_INCREMENT_BASE(start, end, base);
764 if(start->isServed == state){
771 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr)
773 XLINK_RET_ERR_IF(curr == NULL, NULL);
774 xLinkEventPriv_t* ev = NULL;
776 ev = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_READY);
778 mvLog(MVLOG_DEBUG,"ready %s %d \n",
779 TypeToStr((int)ev->packet.header.type),
780 (int)ev->packet.header.id);
785 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){
786 xLinkEventPriv_t* event = NULL;
787 if (q->cur != q->curProc) {
788 event = getNextElementWithState(q->base, q->end, q->curProc, EVENT_ALLOCATED);
790 CIRCULAR_INCREMENT_BASE(q->curProc, q->end, q->base);
796 * @brief Add event to Queue
797 * @note It called from dispatcherAddEvent
799 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
800 eventQueueHandler_t *q, xLinkEvent_t* event,
801 XLink_sem_t* sem, xLinkEventOrigin_t o){
803 xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
804 if (eventP == NULL) {
805 mvLog(MVLOG_ERROR, "getNextElementWithState returned NULL");
808 mvLog(MVLOG_DEBUG, "Received event %s %d", TypeToStr(event->header.type), o);
809 ev = &eventP->packet;
813 eventP->packet = *event;
815 if (o == EVENT_LOCAL) {
816 // XLink API caller provided buffer for return the final result to
817 eventP->retEv = event;
819 eventP->retEv = NULL;
822 eventP->isServed = EVENT_ALLOCATED;
823 CIRCULAR_INCREMENT_BASE(q->cur, q->end, q->base);
827 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
829 XLINK_RET_ERR_IF(curr == NULL, NULL);
831 if (XLink_sem_wait(&curr->notifyDispatcherSem)) {
832 mvLog(MVLOG_ERROR,"can't post semaphore\n");
835 xLinkEventPriv_t* event = NULL;
836 event = searchForReadyEvent(curr);
841 eventQueueHandler_t* hPriorityQueue = curr->queueProcPriority ? &curr->lQueue : &curr->rQueue;
842 eventQueueHandler_t* lPriorityQueue = curr->queueProcPriority ? &curr->rQueue : &curr->lQueue;
843 curr->queueProcPriority = curr->queueProcPriority ? 0 : 1;
845 event = getNextQueueElemToProc(hPriorityQueue);
849 event = getNextQueueElemToProc(lPriorityQueue);
854 static int dispatcherClean(xLinkSchedulerState_t* curr)
856 XLINK_RET_ERR_IF(pthread_mutex_lock(&clean_mutex), 1);
857 if (curr->schedulerId == -1) {
858 mvLog(MVLOG_WARN,"Scheduler has already been reset or cleaned");
859 if(pthread_mutex_unlock(&clean_mutex) != 0) {
860 mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex");
866 mvLog(MVLOG_INFO, "Start Clean Dispatcher...");
868 if (XLink_sem_post(&curr->notifyDispatcherSem)) {
869 mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event
871 xLinkEventPriv_t* event = dispatcherGetNextEvent(curr);
872 while (event != NULL) {
873 mvLog(MVLOG_INFO, "dropped event is %s, status %d\n",
874 TypeToStr(event->packet.header.type), event->isServed);
876 postAndMarkEventServed(event);
877 event = dispatcherGetNextEvent(curr);
880 dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
881 dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
883 curr->schedulerId = -1;
884 curr->resetXLink = 1;
885 XLink_sem_destroy(&curr->addEventSem);
886 XLink_sem_destroy(&curr->notifyDispatcherSem);
887 localSem_t* temp = curr->eventSemaphores;
888 while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
889 // unblock potentially blocked event semaphores
890 XLink_sem_post(&temp->sem);
891 XLink_sem_destroy(&temp->sem);
896 mvLog(MVLOG_INFO, "Clean Dispatcher Successfully...");
897 if(pthread_mutex_unlock(&clean_mutex) != 0) {
898 mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex after clearing dispatcher");
903 static int dispatcherReset(xLinkSchedulerState_t* curr)
905 ASSERT_XLINK(curr != NULL);
907 glControlFunc->closeDeviceFd(&curr->deviceHandle);
908 if(dispatcherClean(curr)) {
909 mvLog(MVLOG_INFO, "Failed to clean dispatcher");
912 xLinkDesc_t* link = getLink(curr->deviceHandle.xLinkFD);
913 if(link == NULL || XLink_sem_post(&link->dispatcherClosedSem)) {
914 mvLog(MVLOG_DEBUG,"can't post dispatcherClosedSem\n");
917 glControlFunc->closeLink(curr->deviceHandle.xLinkFD, 1);
918 mvLog(MVLOG_DEBUG,"Reset Successfully\n");
922 static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
924 xLinkEventPriv_t* event;
925 xLinkEventPriv_t response;
927 while (!curr->resetXLink) {
928 event = dispatcherGetNextEvent(curr);
930 mvLog(MVLOG_ERROR,"Dispatcher received NULL event!");
932 break; //Mean that user reset XLink.
938 if(event->packet.deviceHandle.xLinkFD
939 != curr->deviceHandle.xLinkFD) {
940 mvLog(MVLOG_FATAL,"The file descriptor mismatch between the event and the scheduler.\n"
941 " Event: id=%d, fd=%p"
943 event->packet.header.id, event->packet.deviceHandle.xLinkFD,
944 curr->deviceHandle.xLinkFD);
945 event->packet.header.flags.bitField.nack = 1;
946 event->packet.header.flags.bitField.ack = 0;
948 if (event->origin == EVENT_LOCAL){
949 dispatcherRequestServe(event, curr);
951 dispatcherResponseServe(event, curr);
957 getRespFunction getResp;
958 xLinkEvent_t* toSend;
959 if (event->origin == EVENT_LOCAL){
960 getResp = glControlFunc->localGetResponse;
961 toSend = &event->packet;
963 getResp = glControlFunc->remoteGetResponse;
964 toSend = &response.packet;
967 res = getResp(&event->packet, &response.packet);
968 if (isEventTypeRequest(event)){
969 if (event->origin == EVENT_LOCAL){ //we need to do this for locals only
970 if(dispatcherRequestServe(event, curr)) {
971 mvLog(MVLOG_ERROR, "Failed to serve local event. "
972 "Event: id=%d, type=%s, streamId=%u, streamName=%s",
973 event->packet.header.id, TypeToStr(event->packet.header.type),
974 event->packet.header.streamId, event->packet.header.streamName);
978 if (res == 0 && event->packet.header.flags.bitField.localServe == 0) {
980 if (toSend->header.type == XLINK_RESET_REQ) {
981 curr->resetXLink = 1;
982 mvLog(MVLOG_DEBUG,"Send XLINK_RESET_REQ, stopping sendEvents thread.");
983 if(toSend->deviceHandle.protocol == X_LINK_PCIE) {
984 toSend->header.type = XLINK_PING_REQ;
985 mvLog(MVLOG_DEBUG, "Request for reboot not sent, only ping event");
988 toSend->header.type = XLINK_PING_REQ;
989 mvLog(MVLOG_INFO, "Request for reboot not sent, only ping event");
990 #endif // defined(NO_BOOT)
995 if (glControlFunc->eventSend(toSend) != 0) {
996 dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
997 dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
998 mvLog(MVLOG_ERROR, "Event sending failed");
1002 if (event->origin == EVENT_REMOTE){ // match remote response with the local request
1003 dispatcherResponseServe(event, curr);
1007 if (event->origin == EVENT_REMOTE){
1008 event->isServed = EVENT_SERVED;
1012 return X_LINK_SUCCESS;
1015 static void dispatcherFreeEvents(eventQueueHandler_t *queue, xLinkEventState_t state) {
1020 xLinkEventPriv_t* event = getNextElementWithState(queue->base, queue->end, queue->base, state);
1021 while (event != NULL) {
1022 mvLog(MVLOG_DEBUG, "Event is %s, size is %d, Mark it served\n", TypeToStr(event->packet.header.type), event->packet.header.size);
1023 postAndMarkEventServed(event);
1024 event = getNextElementWithState(queue->base, queue->end, queue->base, state);
1029 // ------------------------------------
1030 // Helpers implementation. End.
1031 // ------------------------------------