[IE][VPU][XLink]: XLink semaphore wrappers impl (#3079)
[platform/upstream/dldt.git] / inference-engine / thirdparty / movidius / XLink / shared / src / XLinkDispatcher.c
1 // Copyright (C) 2018-2020 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4
5 ///
6 /// @file
7 ///
8 /// @brief     Application configuration Leon header
9 ///
10 #ifndef _GNU_SOURCE
11 #define _GNU_SOURCE // fix for warning: implicit declaration of function ‘pthread_setname_np’
12 #endif
13
14 #include "stdio.h"
15 #include "stdint.h"
16 #include "stdlib.h"
17 #include "string.h"
18 #include <assert.h>
19 #include <stdlib.h>
20
21 #include "XLinkDispatcher.h"
22 #include "XLinkMacros.h"
23 #include "XLinkPrivateDefines.h"
24 #include "XLinkPrivateFields.h"
25 #include "XLink.h"
26 #include "XLinkErrorUtils.h"
27
28 #define MVLOG_UNIT_NAME xLink
29 #include "XLinkLog.h"
30
31 // ------------------------------------
32 // Data structures declaration. Begin.
33 // ------------------------------------
34
35 typedef enum {
36     EVENT_ALLOCATED,
37     EVENT_PENDING,
38     EVENT_BLOCKED,
39     EVENT_READY,
40     EVENT_SERVED,
41 } xLinkEventState_t;
42
43 typedef struct xLinkEventPriv_t {
44     xLinkEvent_t packet;
45     xLinkEvent_t *retEv;
46     xLinkEventState_t isServed;
47     xLinkEventOrigin_t origin;
48     XLink_sem_t* sem;
49     void* data;
50 } xLinkEventPriv_t;
51
52 typedef struct {
53     XLink_sem_t sem;
54     pthread_t threadId;
55 } localSem_t;
56
57 typedef struct{
58     xLinkEventPriv_t* end;
59     xLinkEventPriv_t* base;
60
61     xLinkEventPriv_t* curProc;
62     xLinkEventPriv_t* cur;
63     XLINK_ALIGN_TO_BOUNDARY(64) xLinkEventPriv_t q[MAX_EVENTS];
64
65 }eventQueueHandler_t;
66 /**
67  * @brief Scheduler for each device
68  */
69 typedef struct {
70     xLinkDeviceHandle_t deviceHandle; //will be device handler
71     int schedulerId;
72
73     int queueProcPriority;
74
75     XLink_sem_t addEventSem;
76     XLink_sem_t notifyDispatcherSem;
77     volatile uint32_t resetXLink;
78     uint32_t semaphores;
79     pthread_t xLinkThreadId;
80
81     eventQueueHandler_t lQueue; //local queue
82     eventQueueHandler_t rQueue; //remote queue
83     localSem_t eventSemaphores[MAXIMUM_SEMAPHORES];
84 } xLinkSchedulerState_t;
85
86
87 // ------------------------------------
88 // Data structures declaration. Begin.
89 // ------------------------------------
90
91
92
93 // ------------------------------------
94 // Global fields declaration. Begin.
95 // ------------------------------------
96
97 //These will be common for all, Initialized only once
98 DispatcherControlFunctions* glControlFunc;
99 int numSchedulers;
100 xLinkSchedulerState_t schedulerState[MAX_SCHEDULERS];
101 sem_t addSchedulerSem;
102
103 static pthread_mutex_t clean_mutex = PTHREAD_MUTEX_INITIALIZER;
104
105 // ------------------------------------
106 // Global fields declaration. End.
107 // ------------------------------------
108
109
110
111 // ------------------------------------
112 // Helpers declaration. Begin.
113 // ------------------------------------
114
115 //below workaround for "C2088 '==': illegal for struct" error
116 static int pthread_t_compare(pthread_t a, pthread_t b);
117
118 static XLink_sem_t* createSem(xLinkSchedulerState_t* curr);
119 static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr);
120
121 #if (defined(_WIN32) || defined(_WIN64))
122 static void* __cdecl eventReader(void* ctx);
123 static void* __cdecl eventSchedulerRun(void* ctx);
124 #else
125 static void* eventReader(void* ctx);
126 static void* eventSchedulerRun(void* ctx);
127 #endif
128
129 static int isEventTypeRequest(xLinkEventPriv_t* event);
130 static void postAndMarkEventServed(xLinkEventPriv_t *event);
131 static int createUniqueID();
132 static int findAvailableScheduler();
133 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD);
134
135 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr);
136 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr);
137
138 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
139                                                         xLinkEventPriv_t* start, xLinkEventState_t state);
140
141 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr);
142
143 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q );
144 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
145                                             eventQueueHandler_t *q, xLinkEvent_t* event,
146                                             XLink_sem_t* sem, xLinkEventOrigin_t o);
147
148 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr);
149
150 static int dispatcherClean(xLinkSchedulerState_t* curr);
151 static int dispatcherReset(xLinkSchedulerState_t* curr);
152 static void dispatcherFreeEvents(eventQueueHandler_t *queue, xLinkEventState_t state);
153
154 static XLinkError_t sendEvents(xLinkSchedulerState_t* curr);
155
156 // ------------------------------------
157 // Helpers declaration. End.
158 // ------------------------------------
159
160
161
162 // ------------------------------------
163 // XLinkDispatcher.h implementation. Begin.
164 // ------------------------------------
165
166 XLinkError_t DispatcherInitialize(DispatcherControlFunctions *controlFunc) {
167     ASSERT_XLINK(controlFunc != NULL);
168
169     if (!controlFunc->eventReceive ||
170         !controlFunc->eventSend ||
171         !controlFunc->localGetResponse ||
172         !controlFunc->remoteGetResponse) {
173         return X_LINK_ERROR;
174     }
175
176     glControlFunc = controlFunc;
177     numSchedulers = 0;
178
179     if (sem_init(&addSchedulerSem, 0, 1)) {
180         mvLog(MVLOG_ERROR, "Can't create semaphore\n");
181         return X_LINK_ERROR;
182     }
183
184     for (int i = 0; i < MAX_SCHEDULERS; i++){
185         schedulerState[i].schedulerId = -1;
186     }
187
188     return X_LINK_SUCCESS;
189 }
190
191 XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle)
192 {
193     ASSERT_XLINK(deviceHandle);
194 #ifdef __PC__
195     ASSERT_XLINK(deviceHandle->xLinkFD != NULL);
196 #endif
197
198     pthread_attr_t attr;
199     int eventIdx;
200     if (numSchedulers >= MAX_SCHEDULERS)
201     {
202         mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
203         return -1;
204     }
205     int idx = findAvailableScheduler();
206     if (idx == -1) {
207         mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
208         return -1;
209     }
210
211     memset(&schedulerState[idx], 0, sizeof(xLinkSchedulerState_t));
212
213     schedulerState[idx].semaphores = 0;
214     schedulerState[idx].queueProcPriority = 0;
215
216     schedulerState[idx].resetXLink = 0;
217     schedulerState[idx].deviceHandle = *deviceHandle;
218     schedulerState[idx].schedulerId = idx;
219
220     schedulerState[idx].lQueue.cur = schedulerState[idx].lQueue.q;
221     schedulerState[idx].lQueue.curProc = schedulerState[idx].lQueue.q;
222     schedulerState[idx].lQueue.base = schedulerState[idx].lQueue.q;
223     schedulerState[idx].lQueue.end = &schedulerState[idx].lQueue.q[MAX_EVENTS];
224
225     schedulerState[idx].rQueue.cur = schedulerState[idx].rQueue.q;
226     schedulerState[idx].rQueue.curProc = schedulerState[idx].rQueue.q;
227     schedulerState[idx].rQueue.base = schedulerState[idx].rQueue.q;
228     schedulerState[idx].rQueue.end = &schedulerState[idx].rQueue.q[MAX_EVENTS];
229
230     for (eventIdx = 0 ; eventIdx < MAX_EVENTS; eventIdx++)
231     {
232         schedulerState[idx].rQueue.q[eventIdx].isServed = EVENT_SERVED;
233         schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED;
234     }
235
236     if (XLink_sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
237         perror("Can't create semaphore\n");
238         return -1;
239     }
240     if (XLink_sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
241         perror("Can't create semaphore\n");
242     }
243     localSem_t* temp = schedulerState[idx].eventSemaphores;
244     while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) {
245         XLink_sem_set_refs(&temp->sem, -1);
246         temp++;
247     }
248     if (pthread_attr_init(&attr) != 0) {
249         mvLog(MVLOG_ERROR,"pthread_attr_init error");
250         return X_LINK_ERROR;
251     }
252
253 #ifndef __PC__
254     if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
255         mvLog(MVLOG_ERROR,"pthread_attr_setinheritsched error");
256         pthread_attr_destroy(&attr);
257     }
258     if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
259         mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
260         pthread_attr_destroy(&attr);
261     }
262 #endif
263
264     sem_wait(&addSchedulerSem);
265     mvLog(MVLOG_DEBUG,"%s() starting a new thread - schedulerId %d \n", __func__, idx);
266     int sc = pthread_create(&schedulerState[idx].xLinkThreadId,
267                             &attr,
268                             eventSchedulerRun,
269                             (void*)&schedulerState[idx].schedulerId);
270     if (sc) {
271         mvLog(MVLOG_ERROR,"Thread creation failed with error: %d", sc);
272         if (pthread_attr_destroy(&attr) != 0) {
273             perror("Thread attr destroy failed\n");
274         }
275         return X_LINK_ERROR;
276     }
277
278 #ifndef __APPLE__
279     char schedulerThreadName[MVLOG_MAXIMUM_THREAD_NAME_SIZE];
280     snprintf(schedulerThreadName, sizeof(schedulerThreadName), "Scheduler%.2dThr", schedulerState[idx].schedulerId);
281     sc = pthread_setname_np(schedulerState[idx].xLinkThreadId, schedulerThreadName);
282     if (sc != 0) {
283         perror("Setting name for indexed scheduler thread failed");
284     }
285 #endif
286
287     pthread_detach(schedulerState[idx].xLinkThreadId);
288
289     numSchedulers++;
290     if (pthread_attr_destroy(&attr) != 0) {
291         mvLog(MVLOG_ERROR,"pthread_attr_destroy error");
292     }
293
294     sem_post(&addSchedulerSem);
295
296     return 0;
297 }
298
299 int DispatcherClean(xLinkDeviceHandle_t *deviceHandle) {
300     XLINK_RET_IF(deviceHandle == NULL);
301
302     xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
303     XLINK_RET_IF(curr == NULL);
304
305     return dispatcherClean(curr);
306 }
307
308 xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
309 {
310     xLinkSchedulerState_t* curr = findCorrespondingScheduler(event->deviceHandle.xLinkFD);
311     XLINK_RET_ERR_IF(curr == NULL, NULL);
312
313     if(curr->resetXLink) {
314         return NULL;
315     }
316     mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
317     if (XLink_sem_wait(&curr->addEventSem)) {
318         mvLog(MVLOG_ERROR,"can't wait semaphore\n");
319         return NULL;
320     }
321
322     XLink_sem_t *sem = NULL;
323     xLinkEvent_t* ev;
324     if (origin == EVENT_LOCAL) {
325         event->header.id = createUniqueID();
326         sem = getSem(pthread_self(), curr);
327         if (!sem) {
328             sem = createSem(curr);
329         }
330         if (!sem) {
331             mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n");
332             if (XLink_sem_post(&curr->addEventSem)) {
333                 mvLog(MVLOG_ERROR,"can't post semaphore\n");
334             }
335
336             return NULL;
337         }
338         event->header.flags.raw = 0;
339         ev = addNextQueueElemToProc(curr, &curr->lQueue, event, sem, origin);
340     } else {
341         ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
342     }
343     if (XLink_sem_post(&curr->addEventSem)) {
344         mvLog(MVLOG_ERROR,"can't post semaphore\n");
345     }
346     if (XLink_sem_post(&curr->notifyDispatcherSem)) {
347         mvLog(MVLOG_ERROR, "can't post semaphore\n");
348     }
349     return ev;
350 }
351
352 int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle)
353 {
354     xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
355     ASSERT_XLINK(curr != NULL);
356
357     XLink_sem_t* id = getSem(pthread_self(), curr);
358     if (id == NULL) {
359         return -1;
360     }
361
362     int rc = XLink_sem_wait(id);
363 #ifdef __PC__
364     if (rc) {
365         xLinkEvent_t event = {0};
366         event.header.type = XLINK_RESET_REQ;
367         event.deviceHandle = *deviceHandle;
368         mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
369         DispatcherAddEvent(EVENT_LOCAL, &event);
370         id = getSem(pthread_self(), curr);
371         if (id == NULL || XLink_sem_wait(id)) {
372             dispatcherReset(curr);
373         }
374     }
375 #endif
376
377     return rc;
378 }
379
380 char* TypeToStr(int type)
381 {
382     switch(type)
383     {
384         case XLINK_WRITE_REQ:     return "XLINK_WRITE_REQ";
385         case XLINK_READ_REQ:      return "XLINK_READ_REQ";
386         case XLINK_READ_REL_REQ:  return "XLINK_READ_REL_REQ";
387         case XLINK_CREATE_STREAM_REQ:return "XLINK_CREATE_STREAM_REQ";
388         case XLINK_CLOSE_STREAM_REQ: return "XLINK_CLOSE_STREAM_REQ";
389         case XLINK_PING_REQ:         return "XLINK_PING_REQ";
390         case XLINK_RESET_REQ:        return "XLINK_RESET_REQ";
391         case XLINK_REQUEST_LAST:     return "XLINK_REQUEST_LAST";
392         case XLINK_WRITE_RESP:   return "XLINK_WRITE_RESP";
393         case XLINK_READ_RESP:     return "XLINK_READ_RESP";
394         case XLINK_READ_REL_RESP: return "XLINK_READ_REL_RESP";
395         case XLINK_CREATE_STREAM_RESP: return "XLINK_CREATE_STREAM_RESP";
396         case XLINK_CLOSE_STREAM_RESP:  return "XLINK_CLOSE_STREAM_RESP";
397         case XLINK_PING_RESP:  return "XLINK_PING_RESP";
398         case XLINK_RESET_RESP: return "XLINK_RESET_RESP";
399         case XLINK_RESP_LAST:  return "XLINK_RESP_LAST";
400         default:
401             break;
402     }
403     return "";
404 }
405
406 int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void *xlinkFD)
407 {
408     xLinkSchedulerState_t* curr = findCorrespondingScheduler(xlinkFD);
409     ASSERT_XLINK(curr != NULL);
410
411     mvLog(MVLOG_DEBUG,"unblock\n");
412     xLinkEventPriv_t* blockedEvent;
413     for (blockedEvent = curr->lQueue.q;
414          blockedEvent < curr->lQueue.q + MAX_EVENTS;
415          blockedEvent++)
416     {
417         if (blockedEvent->isServed == EVENT_BLOCKED &&
418             ((blockedEvent->packet.header.id == id || id == -1)
419              && blockedEvent->packet.header.type == type
420              && blockedEvent->packet.header.streamId == stream))
421         {
422             mvLog(MVLOG_DEBUG,"unblocked**************** %d %s\n",
423                   (int)blockedEvent->packet.header.id,
424                   TypeToStr((int)blockedEvent->packet.header.type));
425             blockedEvent->isServed = EVENT_READY;
426             if (XLink_sem_post(&curr->notifyDispatcherSem)){
427                 mvLog(MVLOG_ERROR, "can't post semaphore\n");
428             }
429             return 1;
430         } else {
431             mvLog(MVLOG_DEBUG,"%d %s\n",
432                   (int)blockedEvent->packet.header.id,
433                   TypeToStr((int)blockedEvent->packet.header.type));
434         }
435     }
436     return 0;
437 }
438
439 // ------------------------------------
440 // XLinkDispatcher.h implementation. End.
441 // ------------------------------------
442
443
444
445 // ------------------------------------
446 // Helpers implementation. Begin.
447 // ------------------------------------
448
449 int pthread_t_compare(pthread_t a, pthread_t b)
450 {
451 #if (defined(_WIN32) || defined(_WIN64) )
452     return ((a.tid == b.tid));
453 #else
454     return  (a == b);
455 #endif
456 }
457
458 static XLink_sem_t* createSem(xLinkSchedulerState_t* curr)
459 {
460     XLINK_RET_ERR_IF(curr == NULL, NULL);
461
462     XLink_sem_t* sem = getSem(pthread_self(), curr);
463     if (sem) {// it already exists, error
464         return NULL;
465     }
466
467     if (curr->semaphores <= MAXIMUM_SEMAPHORES) {
468         localSem_t* temp = curr->eventSemaphores;
469
470         while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
471             int refs = 0;
472             XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
473             if (refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) {
474                 if (curr->semaphores == MAXIMUM_SEMAPHORES && refs == 0) {
475                     XLINK_RET_ERR_IF(XLink_sem_destroy(&temp->sem), NULL);
476                     XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
477                     curr->semaphores --;
478 #if (defined(_WIN32) || defined(_WIN64))
479                     memset(&temp->threadId, 0, sizeof(temp->threadId));
480 #else
481                     temp->threadId = 0;
482 #endif
483                 }
484
485                 if (refs == -1) {
486                     sem = &temp->sem;
487                     if (XLink_sem_init(sem, 0, 0)){
488                         mvLog(MVLOG_ERROR, "Error: Can't create semaphore\n");
489                         return NULL;
490                     }
491                     curr->semaphores++;
492                     temp->threadId = pthread_self();
493                     break;
494                 }
495             }
496             temp++;
497         }
498         if (!sem) {
499             return NULL; //shouldn't happen
500         }
501     }
502     else {
503         mvLog(MVLOG_ERROR, "Error: cached semaphores %d exceeds the MAXIMUM_SEMAPHORES %d", curr->semaphores, MAXIMUM_SEMAPHORES);
504         return NULL;
505     }
506
507     return sem;
508 }
509
510 static XLink_sem_t* getSem(pthread_t threadId, xLinkSchedulerState_t *curr)
511 {
512     XLINK_RET_ERR_IF(curr == NULL, NULL);
513
514     localSem_t* temp = curr->eventSemaphores;
515     while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
516         int refs = 0;
517         XLINK_RET_ERR_IF(XLink_sem_get_refs(&temp->sem, &refs), NULL);
518         if (pthread_t_compare(temp->threadId, threadId) && refs >= 0) {
519             return &temp->sem;
520         }
521         temp++;
522     }
523     return NULL;
524 }
525
526 #if (defined(_WIN32) || defined(_WIN64))
527 static void* __cdecl eventReader(void* ctx)
528 #else
529 static void* eventReader(void* ctx)
530 #endif
531 {
532     xLinkSchedulerState_t *curr = (xLinkSchedulerState_t*)ctx;
533     XLINK_RET_ERR_IF(curr == NULL, NULL);
534
535     xLinkEvent_t event = { 0 };// to fix error C4700 in win
536     event.header.id = -1;
537     event.deviceHandle = curr->deviceHandle;
538
539     mvLog(MVLOG_INFO,"eventReader thread started");
540
541     while (!curr->resetXLink) {
542         int sc = glControlFunc->eventReceive(&event);
543
544         mvLog(MVLOG_DEBUG,"Reading %s (scheduler %d, fd %p, event id %d, event stream_id %u, event size %u)\n",
545               TypeToStr(event.header.type), curr->schedulerId, event.deviceHandle.xLinkFD, event.header.id, event.header.streamId, event.header.size);
546
547         if (sc) {
548             mvLog(MVLOG_DEBUG,"Failed to receive event (err %d)", sc);
549             dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
550             dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
551             continue;
552         }
553
554         DispatcherAddEvent(EVENT_REMOTE, &event);
555
556         if (event.header.type == XLINK_RESET_REQ) {
557             curr->resetXLink = 1;
558             mvLog(MVLOG_DEBUG,"Read XLINK_RESET_REQ, stopping eventReader thread.");
559         }
560     }
561
562     return 0;
563 }
564
565 #if (defined(_WIN32) || defined(_WIN64))
566 static void* __cdecl eventSchedulerRun(void* ctx)
567 #else
568 static void* eventSchedulerRun(void* ctx)
569 #endif
570 {
571     int schedulerId = *((int*) ctx);
572     mvLog(MVLOG_DEBUG,"%s() schedulerId %d\n", __func__, schedulerId);
573     XLINK_RET_ERR_IF(schedulerId >= MAX_SCHEDULERS, NULL);
574
575     xLinkSchedulerState_t* curr = &schedulerState[schedulerId];
576     pthread_t readerThreadId;        /* Create thread for reader.
577                         This thread will notify the dispatcher of any incoming packets*/
578     pthread_attr_t attr;
579     int sc;
580     if (pthread_attr_init(&attr) != 0) {
581         mvLog(MVLOG_ERROR,"pthread_attr_init error");
582         return NULL;
583     }
584 #ifndef __PC__
585     if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
586         pthread_attr_destroy(&attr);
587         mvLog(MVLOG_ERROR,"pthread_attr_setinheritsched error");
588         return NULL;
589     }
590     if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
591         pthread_attr_destroy(&attr);
592         mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
593         return NULL;
594     }
595 #endif
596     sc = pthread_create(&readerThreadId, &attr, eventReader, curr);
597     if (sc) {
598         mvLog(MVLOG_ERROR, "Thread creation failed");
599         if (pthread_attr_destroy(&attr) != 0) {
600             perror("Thread attr destroy failed\n");
601         }
602         return NULL;
603     }
604 #ifndef __APPLE__
605     char eventReaderThreadName[MVLOG_MAXIMUM_THREAD_NAME_SIZE];
606     snprintf(eventReaderThreadName, sizeof(eventReaderThreadName), "EventRead%.2dThr", schedulerId);
607     sc = pthread_setname_np(readerThreadId, eventReaderThreadName);
608     if (sc != 0) {
609         perror("Setting name for event reader thread failed");
610     }
611 #endif
612     mvLog(MVLOG_INFO,"Scheduler thread started");
613
614     XLinkError_t rc = sendEvents(curr);
615     if(rc) {
616         mvLog(MVLOG_ERROR, "sendEvents method finished with an error: %s", XLinkErrorToStr(rc));
617     }
618
619     sc = pthread_join(readerThreadId, NULL);
620     if (sc) {
621         mvLog(MVLOG_ERROR, "Waiting for thread failed");
622     }
623
624     sc = pthread_attr_destroy(&attr);
625     if (sc) {
626         mvLog(MVLOG_WARN, "Thread attr destroy failed");
627     }
628
629     if (dispatcherReset(curr) != 0) {
630         mvLog(MVLOG_WARN, "Failed to reset");
631     }
632
633     if (curr->resetXLink != 1) {
634         mvLog(MVLOG_ERROR,"Scheduler thread stopped");
635     } else {
636         mvLog(MVLOG_INFO,"Scheduler thread stopped");
637     }
638
639     return NULL;
640 }
641
642 static int isEventTypeRequest(xLinkEventPriv_t* event)
643 {
644     return event->packet.header.type < XLINK_REQUEST_LAST;
645 }
646
647 static void postAndMarkEventServed(xLinkEventPriv_t *event)
648 {
649     if (event->retEv){
650         // the xLinkEventPriv_t slot pointed by "event" will be
651         // re-cycled as soon as we mark it as EVENT_SERVED,
652         // so before that, we copy the result event into XLink API layer
653         *(event->retEv) = event->packet;
654     }
655     if(event->sem){
656         if (XLink_sem_post(event->sem)) {
657             mvLog(MVLOG_ERROR,"can't post semaphore\n");
658         }
659     }
660
661     event->isServed = EVENT_SERVED;
662 }
663
664 static int createUniqueID()
665 {
666     static int id = 0xa;
667     return id++;
668 }
669
670 int findAvailableScheduler()
671 {
672     int i;
673     for (i = 0; i < MAX_SCHEDULERS; i++)
674         if (schedulerState[i].schedulerId == -1)
675             return i;
676     return -1;
677 }
678
679 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD)
680 {
681     int i;
682     if (xLinkFD == NULL) { //in case of myriad there should be one scheduler
683         if (numSchedulers == 1)
684             return &schedulerState[0];
685         else
686             NULL;
687     }
688     for (i=0; i < MAX_SCHEDULERS; i++)
689         if (schedulerState[i].schedulerId != -1 &&
690             schedulerState[i].deviceHandle.xLinkFD == xLinkFD)
691             return &schedulerState[i];
692
693     return NULL;
694 }
695
696 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr){
697     XLINK_RET_IF(curr == NULL);
698     XLINK_RET_IF(!isEventTypeRequest(event));
699     xLinkEventHeader_t *header = &event->packet.header;
700     if (header->flags.bitField.block){ //block is requested
701         event->isServed = EVENT_BLOCKED;
702     } else if(header->flags.bitField.localServe == 1 ||
703               (header->flags.bitField.ack == 0
704                && header->flags.bitField.nack == 1)){ //this event is served locally, or it is failed
705         postAndMarkEventServed(event);
706     }else if (header->flags.bitField.ack == 1
707               && header->flags.bitField.nack == 0){
708         event->isServed = EVENT_PENDING;
709         mvLog(MVLOG_DEBUG,"------------------------UNserved %s\n",
710               TypeToStr(event->packet.header.type));
711     }else{
712         return 1;
713     }
714     return 0;
715 }
716
717 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr)
718 {
719     XLINK_RET_ERR_IF(curr == NULL, 1);
720     XLINK_RET_ERR_IF(isEventTypeRequest(event), 1);
721     int i = 0;
722     for (i = 0; i < MAX_EVENTS; i++)
723     {
724         xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
725         xLinkEventHeader_t *evHeader = &event->packet.header;
726
727         if (curr->lQueue.q[i].isServed == EVENT_PENDING &&
728             header->id == evHeader->id &&
729             header->type == evHeader->type - XLINK_REQUEST_LAST -1)
730         {
731             mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n",
732                   TypeToStr(header->type));
733             //propagate back flags
734             header->flags = evHeader->flags;
735             postAndMarkEventServed(&curr->lQueue.q[i]);
736             break;
737         }
738     }
739     if (i == MAX_EVENTS) {
740         mvLog(MVLOG_FATAL,"no request for this response: %s %d\n", TypeToStr(event->packet.header.type), event->origin);
741         mvLog(MVLOG_DEBUG,"#### (i == MAX_EVENTS) %s %d %d\n", TypeToStr(event->packet.header.type), event->origin, (int)event->packet.header.id);
742         for (i = 0; i < MAX_EVENTS; i++)
743         {
744             xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
745
746             mvLog(MVLOG_DEBUG,"%d) header->id %i, header->type %s(%i), curr->lQueue.q[i].isServed %i, EVENT_PENDING %i\n", i, (int)header->id
747             , TypeToStr(header->type), header->type, curr->lQueue.q[i].isServed, EVENT_PENDING);
748
749         }
750         return 1;
751     }
752     return 0;
753 }
754
755 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
756                                                         xLinkEventPriv_t* start, xLinkEventState_t state){
757     xLinkEventPriv_t* tmp = start;
758     while (start->isServed != state){
759         CIRCULAR_INCREMENT_BASE(start, end, base);
760         if(tmp == start){
761             break;
762         }
763     }
764     if(start->isServed == state){
765         return start;
766     }else{
767         return NULL;
768     }
769 }
770
771 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr)
772 {
773     XLINK_RET_ERR_IF(curr == NULL, NULL);
774     xLinkEventPriv_t* ev = NULL;
775
776     ev = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_READY);
777     if(ev){
778         mvLog(MVLOG_DEBUG,"ready %s %d \n",
779               TypeToStr((int)ev->packet.header.type),
780               (int)ev->packet.header.id);
781     }
782     return ev;
783 }
784
785 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){
786     xLinkEventPriv_t* event = NULL;
787     if (q->cur != q->curProc) {
788         event = getNextElementWithState(q->base, q->end, q->curProc, EVENT_ALLOCATED);
789         q->curProc = event;
790         CIRCULAR_INCREMENT_BASE(q->curProc, q->end, q->base);
791     }
792     return event;
793 }
794
795 /**
796  * @brief Add event to Queue
797  * @note It called from dispatcherAddEvent
798  */
799 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
800                                             eventQueueHandler_t *q, xLinkEvent_t* event,
801                                             XLink_sem_t* sem, xLinkEventOrigin_t o){
802     xLinkEvent_t* ev;
803     xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
804     if (eventP == NULL) {
805         mvLog(MVLOG_ERROR, "getNextElementWithState returned NULL");
806         return NULL;
807     }
808     mvLog(MVLOG_DEBUG, "Received event %s %d", TypeToStr(event->header.type), o);
809     ev = &eventP->packet;
810
811     (void)curr;
812     eventP->sem = sem;
813     eventP->packet = *event;
814     eventP->origin = o;
815     if (o == EVENT_LOCAL) {
816         // XLink API caller provided buffer for return the final result to
817         eventP->retEv = event;
818     }else{
819         eventP->retEv = NULL;
820     }
821     q->cur = eventP;
822     eventP->isServed = EVENT_ALLOCATED;
823     CIRCULAR_INCREMENT_BASE(q->cur, q->end, q->base);
824     return ev;
825 }
826
827 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
828 {
829     XLINK_RET_ERR_IF(curr == NULL, NULL);
830
831     if (XLink_sem_wait(&curr->notifyDispatcherSem)) {
832         mvLog(MVLOG_ERROR,"can't post semaphore\n");
833     }
834
835     xLinkEventPriv_t* event = NULL;
836     event = searchForReadyEvent(curr);
837     if (event) {
838         return event;
839     }
840
841     eventQueueHandler_t* hPriorityQueue = curr->queueProcPriority ? &curr->lQueue : &curr->rQueue;
842     eventQueueHandler_t* lPriorityQueue = curr->queueProcPriority ? &curr->rQueue : &curr->lQueue;
843     curr->queueProcPriority = curr->queueProcPriority ? 0 : 1;
844
845     event = getNextQueueElemToProc(hPriorityQueue);
846     if (event) {
847         return event;
848     }
849     event = getNextQueueElemToProc(lPriorityQueue);
850
851     return event;
852 }
853
854 static int dispatcherClean(xLinkSchedulerState_t* curr)
855 {
856     XLINK_RET_ERR_IF(pthread_mutex_lock(&clean_mutex), 1);
857     if (curr->schedulerId == -1) {
858         mvLog(MVLOG_WARN,"Scheduler has already been reset or cleaned");
859         if(pthread_mutex_unlock(&clean_mutex) != 0) {
860             mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex");
861         }
862
863         return 1;
864     }
865
866     mvLog(MVLOG_INFO, "Start Clean Dispatcher...");
867
868     if (XLink_sem_post(&curr->notifyDispatcherSem)) {
869         mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event
870     }
871     xLinkEventPriv_t* event = dispatcherGetNextEvent(curr);
872     while (event != NULL) {
873         mvLog(MVLOG_INFO, "dropped event is %s, status %d\n",
874               TypeToStr(event->packet.header.type), event->isServed);
875
876         postAndMarkEventServed(event);
877         event = dispatcherGetNextEvent(curr);
878     }
879
880     dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
881     dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
882
883     curr->schedulerId = -1;
884     curr->resetXLink = 1;
885     XLink_sem_destroy(&curr->addEventSem);
886     XLink_sem_destroy(&curr->notifyDispatcherSem);
887     localSem_t* temp = curr->eventSemaphores;
888     while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
889         // unblock potentially blocked event semaphores
890         XLink_sem_post(&temp->sem);
891         XLink_sem_destroy(&temp->sem);
892         temp++;
893     }
894     numSchedulers--;
895
896     mvLog(MVLOG_INFO, "Clean Dispatcher Successfully...");
897     if(pthread_mutex_unlock(&clean_mutex) != 0) {
898         mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex after clearing dispatcher");
899     }
900     return 0;
901 }
902
903 static int dispatcherReset(xLinkSchedulerState_t* curr)
904 {
905     ASSERT_XLINK(curr != NULL);
906
907     glControlFunc->closeDeviceFd(&curr->deviceHandle);
908     if(dispatcherClean(curr)) {
909         mvLog(MVLOG_INFO, "Failed to clean dispatcher");
910     }
911
912     xLinkDesc_t* link = getLink(curr->deviceHandle.xLinkFD);
913     if(link == NULL || XLink_sem_post(&link->dispatcherClosedSem)) {
914         mvLog(MVLOG_DEBUG,"can't post dispatcherClosedSem\n");
915     }
916
917     glControlFunc->closeLink(curr->deviceHandle.xLinkFD, 1);
918     mvLog(MVLOG_DEBUG,"Reset Successfully\n");
919     return 0;
920 }
921
922 static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
923     int res;
924     xLinkEventPriv_t* event;
925     xLinkEventPriv_t response;
926
927     while (!curr->resetXLink) {
928         event = dispatcherGetNextEvent(curr);
929         if(event == NULL) {
930             mvLog(MVLOG_ERROR,"Dispatcher received NULL event!");
931 #ifdef __PC__
932             break; //Mean that user reset XLink.
933 #else
934             continue;
935 #endif
936         }
937
938         if(event->packet.deviceHandle.xLinkFD
939            != curr->deviceHandle.xLinkFD) {
940             mvLog(MVLOG_FATAL,"The file descriptor mismatch between the event and the scheduler.\n"
941                               "    Event: id=%d, fd=%p"
942                               "    Scheduler fd=%p",
943                               event->packet.header.id, event->packet.deviceHandle.xLinkFD,
944                               curr->deviceHandle.xLinkFD);
945             event->packet.header.flags.bitField.nack = 1;
946             event->packet.header.flags.bitField.ack = 0;
947
948             if (event->origin == EVENT_LOCAL){
949                 dispatcherRequestServe(event, curr);
950             } else {
951                 dispatcherResponseServe(event, curr);
952             }
953
954             continue;
955         }
956
957         getRespFunction getResp;
958         xLinkEvent_t* toSend;
959         if (event->origin == EVENT_LOCAL){
960             getResp = glControlFunc->localGetResponse;
961             toSend = &event->packet;
962         }else{
963             getResp = glControlFunc->remoteGetResponse;
964             toSend = &response.packet;
965         }
966
967         res = getResp(&event->packet, &response.packet);
968         if (isEventTypeRequest(event)){
969             if (event->origin == EVENT_LOCAL){ //we need to do this for locals only
970                 if(dispatcherRequestServe(event, curr)) {
971                     mvLog(MVLOG_ERROR, "Failed to serve local event. "
972                                        "Event: id=%d, type=%s, streamId=%u, streamName=%s",
973                                        event->packet.header.id,  TypeToStr(event->packet.header.type),
974                                        event->packet.header.streamId, event->packet.header.streamName);
975                 }
976             }
977
978             if (res == 0 && event->packet.header.flags.bitField.localServe == 0) {
979 #ifdef __PC__
980                 if (toSend->header.type == XLINK_RESET_REQ) {
981                     curr->resetXLink = 1;
982                     mvLog(MVLOG_DEBUG,"Send XLINK_RESET_REQ, stopping sendEvents thread.");
983                     if(toSend->deviceHandle.protocol == X_LINK_PCIE) {
984                         toSend->header.type = XLINK_PING_REQ;
985                         mvLog(MVLOG_DEBUG, "Request for reboot not sent, only ping event");
986                     } else {
987 #if defined(NO_BOOT)
988                         toSend->header.type = XLINK_PING_REQ;
989                         mvLog(MVLOG_INFO, "Request for reboot not sent, only ping event");
990 #endif // defined(NO_BOOT)
991
992                     }
993                 }
994 #endif // __PC__
995                 if (glControlFunc->eventSend(toSend) != 0) {
996                     dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
997                     dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
998                     mvLog(MVLOG_ERROR, "Event sending failed");
999                 }
1000             }
1001         } else {
1002             if (event->origin == EVENT_REMOTE){ // match remote response with the local request
1003                 dispatcherResponseServe(event, curr);
1004             }
1005         }
1006
1007         if (event->origin == EVENT_REMOTE){
1008             event->isServed = EVENT_SERVED;
1009         }
1010     }
1011
1012     return X_LINK_SUCCESS;
1013 }
1014
1015 static void dispatcherFreeEvents(eventQueueHandler_t *queue, xLinkEventState_t state) {
1016     if(queue == NULL) {
1017         return;
1018     }
1019
1020     xLinkEventPriv_t* event = getNextElementWithState(queue->base, queue->end, queue->base, state);
1021     while (event != NULL) {
1022         mvLog(MVLOG_DEBUG, "Event is %s, size is %d, Mark it served\n", TypeToStr(event->packet.header.type), event->packet.header.size);
1023         postAndMarkEventServed(event);
1024         event = getNextElementWithState(queue->base, queue->end, queue->base, state);
1025     }
1026 }
1027
1028
1029 // ------------------------------------
1030 // Helpers implementation. End.
1031 // ------------------------------------