fc0a660e5e8cd815191ad1f4bc50b155d12e35ab
[platform/upstream/dldt.git] / inference-engine / thirdparty / movidius / XLink / shared / src / XLinkDispatcher.c
1 // Copyright (C) 2018-2020 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4
5 ///
6 /// @file
7 ///
8 /// @brief     Application configuration Leon header
9 ///
10 #ifndef _GNU_SOURCE
11 #define _GNU_SOURCE // fix for warning: implicit declaration of function ‘pthread_setname_np’
12 #endif
13
14 #include "stdio.h"
15 #include "stdint.h"
16 #include "stdlib.h"
17 #include "string.h"
18 #include <assert.h>
19 #include <stdlib.h>
20
21 #if (defined(_WIN32) || defined(_WIN64))
22 # include "win_pthread.h"
23 # include "win_semaphore.h"
24 #else
25 # include <pthread.h>
26 # ifndef __APPLE__
27 #  include <semaphore.h>
28 # endif
29 #endif
30
31 #include "XLinkDispatcher.h"
32 #include "XLinkMacros.h"
33 #include "XLinkPrivateDefines.h"
34 #include "XLinkPrivateFields.h"
35 #include "XLink.h"
36 #include "XLinkErrorUtils.h"
37
38 #define MVLOG_UNIT_NAME xLink
39 #include "XLinkLog.h"
40
41 // ------------------------------------
42 // Data structures declaration. Begin.
43 // ------------------------------------
44
45 typedef enum {
46     EVENT_ALLOCATED,
47     EVENT_PENDING,
48     EVENT_BLOCKED,
49     EVENT_READY,
50     EVENT_SERVED,
51 } xLinkEventState_t;
52
53 typedef struct xLinkEventPriv_t {
54     xLinkEvent_t packet;
55     xLinkEvent_t *retEv;
56     xLinkEventState_t isServed;
57     xLinkEventOrigin_t origin;
58     sem_t* sem;
59     void* data;
60 } xLinkEventPriv_t;
61
62 typedef struct {
63     sem_t sem;
64     pthread_t threadId;
65     int refs;
66 } localSem_t;
67
68 typedef struct{
69     xLinkEventPriv_t* end;
70     xLinkEventPriv_t* base;
71
72     xLinkEventPriv_t* curProc;
73     xLinkEventPriv_t* cur;
74     XLINK_ALIGN_TO_BOUNDARY(64) xLinkEventPriv_t q[MAX_EVENTS];
75
76 }eventQueueHandler_t;
77 /**
78  * @brief Scheduler for each device
79  */
80 typedef struct {
81     xLinkDeviceHandle_t deviceHandle; //will be device handler
82     int schedulerId;
83
84     int queueProcPriority;
85
86     sem_t addEventSem;
87     sem_t notifyDispatcherSem;
88     volatile uint32_t resetXLink;
89     uint32_t semaphores;
90     pthread_t xLinkThreadId;
91
92     eventQueueHandler_t lQueue; //local queue
93     eventQueueHandler_t rQueue; //remote queue
94     localSem_t eventSemaphores[MAXIMUM_SEMAPHORES];
95 } xLinkSchedulerState_t;
96
97
98 // ------------------------------------
99 // Data structures declaration. Begin.
100 // ------------------------------------
101
102
103
104 // ------------------------------------
105 // Global fields declaration. Begin.
106 // ------------------------------------
107
108 //These will be common for all, Initialized only once
109 DispatcherControlFunctions* glControlFunc;
110 int numSchedulers;
111 xLinkSchedulerState_t schedulerState[MAX_SCHEDULERS];
112 sem_t addSchedulerSem;
113
114 static pthread_mutex_t clean_mutex = PTHREAD_MUTEX_INITIALIZER;
115
116 // ------------------------------------
117 // Global fields declaration. End.
118 // ------------------------------------
119
120
121
122 // ------------------------------------
123 // Helpers declaration. Begin.
124 // ------------------------------------
125
126 //below workaround for "C2088 '==': illegal for struct" error
127 static int pthread_t_compare(pthread_t a, pthread_t b);
128
129 static sem_t* createSem(xLinkSchedulerState_t* curr);
130 static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref);
131 static int unrefSem(sem_t* sem,  xLinkSchedulerState_t* curr);
132
133 #if (defined(_WIN32) || defined(_WIN64))
134 static void* __cdecl eventReader(void* ctx);
135 static void* __cdecl eventSchedulerRun(void* ctx);
136 #else
137 static void* eventReader(void* ctx);
138 static void* eventSchedulerRun(void* ctx);
139 #endif
140
141 static int isEventTypeRequest(xLinkEventPriv_t* event);
142 static void postAndMarkEventServed(xLinkEventPriv_t *event);
143 static int createUniqueID();
144 static int findAvailableScheduler();
145 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD);
146
147 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr);
148 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr);
149
150 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
151                                                         xLinkEventPriv_t* start, xLinkEventState_t state);
152
153 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr);
154
155 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q );
156 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
157                                             eventQueueHandler_t *q, xLinkEvent_t* event,
158                                             sem_t* sem, xLinkEventOrigin_t o);
159
160 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr);
161
162 static int dispatcherClean(xLinkSchedulerState_t* curr);
163 static int dispatcherReset(xLinkSchedulerState_t* curr);
164 static void dispatcherFreeEvents(eventQueueHandler_t *queue, xLinkEventState_t state);
165
166 static XLinkError_t sendEvents(xLinkSchedulerState_t* curr);
167
168 // ------------------------------------
169 // Helpers declaration. End.
170 // ------------------------------------
171
172
173
174 // ------------------------------------
175 // XLinkDispatcher.h implementation. Begin.
176 // ------------------------------------
177
178 XLinkError_t DispatcherInitialize(DispatcherControlFunctions *controlFunc) {
179     ASSERT_XLINK(controlFunc != NULL);
180
181     if (!controlFunc->eventReceive ||
182         !controlFunc->eventSend ||
183         !controlFunc->localGetResponse ||
184         !controlFunc->remoteGetResponse) {
185         return X_LINK_ERROR;
186     }
187
188     glControlFunc = controlFunc;
189     numSchedulers = 0;
190
191     if (sem_init(&addSchedulerSem, 0, 1)) {
192         mvLog(MVLOG_ERROR, "Can't create semaphore\n");
193         return X_LINK_ERROR;
194     }
195
196     for (int i = 0; i < MAX_SCHEDULERS; i++){
197         schedulerState[i].schedulerId = -1;
198     }
199
200     return X_LINK_SUCCESS;
201 }
202
203 XLinkError_t DispatcherStart(xLinkDeviceHandle_t *deviceHandle)
204 {
205     ASSERT_XLINK(deviceHandle);
206 #ifdef __PC__
207     ASSERT_XLINK(deviceHandle->xLinkFD != NULL);
208 #endif
209
210     pthread_attr_t attr;
211     int eventIdx;
212     if (numSchedulers >= MAX_SCHEDULERS)
213     {
214         mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
215         return -1;
216     }
217     int idx = findAvailableScheduler();
218     if (idx == -1) {
219         mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
220         return -1;
221     }
222
223     memset(&schedulerState[idx], 0, sizeof(xLinkSchedulerState_t));
224
225     schedulerState[idx].semaphores = 0;
226     schedulerState[idx].queueProcPriority = 0;
227
228     schedulerState[idx].resetXLink = 0;
229     schedulerState[idx].deviceHandle = *deviceHandle;
230     schedulerState[idx].schedulerId = idx;
231
232     schedulerState[idx].lQueue.cur = schedulerState[idx].lQueue.q;
233     schedulerState[idx].lQueue.curProc = schedulerState[idx].lQueue.q;
234     schedulerState[idx].lQueue.base = schedulerState[idx].lQueue.q;
235     schedulerState[idx].lQueue.end = &schedulerState[idx].lQueue.q[MAX_EVENTS];
236
237     schedulerState[idx].rQueue.cur = schedulerState[idx].rQueue.q;
238     schedulerState[idx].rQueue.curProc = schedulerState[idx].rQueue.q;
239     schedulerState[idx].rQueue.base = schedulerState[idx].rQueue.q;
240     schedulerState[idx].rQueue.end = &schedulerState[idx].rQueue.q[MAX_EVENTS];
241
242     for (eventIdx = 0 ; eventIdx < MAX_EVENTS; eventIdx++)
243     {
244         schedulerState[idx].rQueue.q[eventIdx].isServed = EVENT_SERVED;
245         schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED;
246     }
247
248     if (sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
249         perror("Can't create semaphore\n");
250         return -1;
251     }
252     if (sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
253         perror("Can't create semaphore\n");
254     }
255     localSem_t* temp = schedulerState[idx].eventSemaphores;
256     while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) {
257         temp->refs = -1;
258         temp++;
259     }
260     if (pthread_attr_init(&attr) != 0) {
261         mvLog(MVLOG_ERROR,"pthread_attr_init error");
262         return X_LINK_ERROR;
263     }
264
265 #ifndef __PC__
266     if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
267         mvLog(MVLOG_ERROR,"pthread_attr_setinheritsched error");
268         pthread_attr_destroy(&attr);
269     }
270     if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
271         mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
272         pthread_attr_destroy(&attr);
273     }
274 #endif
275
276     sem_wait(&addSchedulerSem);
277     mvLog(MVLOG_DEBUG,"%s() starting a new thread - schedulerId %d \n", __func__, idx);
278     int sc = pthread_create(&schedulerState[idx].xLinkThreadId,
279                             &attr,
280                             eventSchedulerRun,
281                             (void*)&schedulerState[idx].schedulerId);
282     if (sc) {
283         mvLog(MVLOG_ERROR,"Thread creation failed with error: %d", sc);
284         if (pthread_attr_destroy(&attr) != 0) {
285             perror("Thread attr destroy failed\n");
286         }
287         return X_LINK_ERROR;
288     }
289
290 #ifndef __APPLE__
291     char schedulerThreadName[MVLOG_MAXIMUM_THREAD_NAME_SIZE];
292     snprintf(schedulerThreadName, sizeof(schedulerThreadName), "Scheduler%.2dThr", schedulerState[idx].schedulerId);
293     sc = pthread_setname_np(schedulerState[idx].xLinkThreadId, schedulerThreadName);
294     if (sc != 0) {
295         perror("Setting name for indexed scheduler thread failed");
296     }
297 #endif
298
299     pthread_detach(schedulerState[idx].xLinkThreadId);
300
301     numSchedulers++;
302     if (pthread_attr_destroy(&attr) != 0) {
303         mvLog(MVLOG_ERROR,"pthread_attr_destroy error");
304     }
305
306     sem_post(&addSchedulerSem);
307
308     return 0;
309 }
310
311 int DispatcherClean(xLinkDeviceHandle_t *deviceHandle) {
312     XLINK_RET_IF(deviceHandle == NULL);
313
314     xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
315     XLINK_RET_IF(curr == NULL);
316
317     return dispatcherClean(curr);
318 }
319
320 xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
321 {
322     xLinkSchedulerState_t* curr = findCorrespondingScheduler(event->deviceHandle.xLinkFD);
323     XLINK_RET_ERR_IF(curr == NULL, NULL);
324
325     if(curr->resetXLink) {
326         return NULL;
327     }
328     mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
329     if (sem_wait(&curr->addEventSem)) {
330         mvLog(MVLOG_ERROR,"can't wait semaphore\n");
331         return NULL;
332     }
333
334     sem_t *sem = NULL;
335     xLinkEvent_t* ev;
336     if (origin == EVENT_LOCAL) {
337         event->header.id = createUniqueID();
338         sem = getAndRefSem(pthread_self(), curr, 1);
339         if (!sem) {
340             sem = createSem(curr);
341         }
342         if (!sem) {
343             mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n");
344             if (sem_post(&curr->addEventSem)) {
345                 mvLog(MVLOG_ERROR,"can't post semaphore\n");
346             }
347
348             return NULL;
349         }
350         event->header.flags.raw = 0;
351         ev = addNextQueueElemToProc(curr, &curr->lQueue, event, sem, origin);
352     } else {
353         ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
354     }
355     if (sem_post(&curr->addEventSem)) {
356         mvLog(MVLOG_ERROR,"can't post semaphore\n");
357     }
358     if (sem_post(&curr->notifyDispatcherSem)) {
359         mvLog(MVLOG_ERROR, "can't post semaphore\n");
360     }
361     return ev;
362 }
363
364 int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle)
365 {
366     xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
367     ASSERT_XLINK(curr != NULL);
368
369     sem_t* id = getAndRefSem(pthread_self(), curr, 0);
370     if (id == NULL) {
371         return -1;
372     }
373
374     int rc = sem_wait(id);
375 #ifdef __PC__
376     if (rc) {
377         xLinkEvent_t event = {0};
378         event.header.type = XLINK_RESET_REQ;
379         event.deviceHandle = *deviceHandle;
380         mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
381         DispatcherAddEvent(EVENT_LOCAL, &event);
382         id = getAndRefSem(pthread_self(), curr, 0);
383         if (id == NULL || sem_wait(id)) {
384             dispatcherReset(curr);
385         }
386     }
387 #endif
388
389     if ((XLinkError_t)unrefSem(id, curr) == X_LINK_ERROR) {
390         mvLog(MVLOG_WARN, "Failed to unref sem");
391     }
392
393     return rc;
394 }
395
396 char* TypeToStr(int type)
397 {
398     switch(type)
399     {
400         case XLINK_WRITE_REQ:     return "XLINK_WRITE_REQ";
401         case XLINK_READ_REQ:      return "XLINK_READ_REQ";
402         case XLINK_READ_REL_REQ:  return "XLINK_READ_REL_REQ";
403         case XLINK_CREATE_STREAM_REQ:return "XLINK_CREATE_STREAM_REQ";
404         case XLINK_CLOSE_STREAM_REQ: return "XLINK_CLOSE_STREAM_REQ";
405         case XLINK_PING_REQ:         return "XLINK_PING_REQ";
406         case XLINK_RESET_REQ:        return "XLINK_RESET_REQ";
407         case XLINK_REQUEST_LAST:     return "XLINK_REQUEST_LAST";
408         case XLINK_WRITE_RESP:   return "XLINK_WRITE_RESP";
409         case XLINK_READ_RESP:     return "XLINK_READ_RESP";
410         case XLINK_READ_REL_RESP: return "XLINK_READ_REL_RESP";
411         case XLINK_CREATE_STREAM_RESP: return "XLINK_CREATE_STREAM_RESP";
412         case XLINK_CLOSE_STREAM_RESP:  return "XLINK_CLOSE_STREAM_RESP";
413         case XLINK_PING_RESP:  return "XLINK_PING_RESP";
414         case XLINK_RESET_RESP: return "XLINK_RESET_RESP";
415         case XLINK_RESP_LAST:  return "XLINK_RESP_LAST";
416         default:
417             break;
418     }
419     return "";
420 }
421
422 int DispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void *xlinkFD)
423 {
424     xLinkSchedulerState_t* curr = findCorrespondingScheduler(xlinkFD);
425     ASSERT_XLINK(curr != NULL);
426
427     mvLog(MVLOG_DEBUG,"unblock\n");
428     xLinkEventPriv_t* blockedEvent;
429     for (blockedEvent = curr->lQueue.q;
430          blockedEvent < curr->lQueue.q + MAX_EVENTS;
431          blockedEvent++)
432     {
433         if (blockedEvent->isServed == EVENT_BLOCKED &&
434             ((blockedEvent->packet.header.id == id || id == -1)
435              && blockedEvent->packet.header.type == type
436              && blockedEvent->packet.header.streamId == stream))
437         {
438             mvLog(MVLOG_DEBUG,"unblocked**************** %d %s\n",
439                   (int)blockedEvent->packet.header.id,
440                   TypeToStr((int)blockedEvent->packet.header.type));
441             blockedEvent->isServed = EVENT_READY;
442             if (sem_post(&curr->notifyDispatcherSem)){
443                 mvLog(MVLOG_ERROR, "can't post semaphore\n");
444             }
445             return 1;
446         } else {
447             mvLog(MVLOG_DEBUG,"%d %s\n",
448                   (int)blockedEvent->packet.header.id,
449                   TypeToStr((int)blockedEvent->packet.header.type));
450         }
451     }
452     return 0;
453 }
454
455 // ------------------------------------
456 // XLinkDispatcher.h implementation. End.
457 // ------------------------------------
458
459
460
461 // ------------------------------------
462 // Helpers implementation. Begin.
463 // ------------------------------------
464
465 int pthread_t_compare(pthread_t a, pthread_t b)
466 {
467 #if (defined(_WIN32) || defined(_WIN64) )
468     return ((a.tid == b.tid));
469 #else
470     return  (a == b);
471 #endif
472 }
473
474 static sem_t* createSem(xLinkSchedulerState_t* curr)
475 {
476     XLINK_RET_ERR_IF(curr == NULL, NULL);
477
478     sem_t* sem = getAndRefSem(pthread_self(), curr, 0);
479     if (sem) {// it already exists, error
480         return NULL;
481     }
482
483     if (curr->semaphores <= MAXIMUM_SEMAPHORES) {
484         localSem_t* temp = curr->eventSemaphores;
485
486         while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
487             if (temp->refs < 0 || curr->semaphores == MAXIMUM_SEMAPHORES) {
488                 if (curr->semaphores == MAXIMUM_SEMAPHORES && !temp->refs) {
489                     XLINK_RET_ERR_IF(sem_destroy(&temp->sem) == -1, NULL);
490                     curr->semaphores --;
491                     temp->refs = -1;
492 #if (defined(_WIN32) || defined(_WIN64))
493                     memset(&temp->threadId, 0, sizeof(temp->threadId));
494 #else
495                     temp->threadId = 0;
496 #endif
497                 }
498
499                 if (temp->refs == -1) {
500                     sem = &temp->sem;
501                     if (sem_init(sem, 0, 0)){
502                         mvLog(MVLOG_ERROR, "Error: Can't create semaphore\n");
503                         return NULL;
504                     }
505                     curr->semaphores++;
506                     temp->refs = 1;
507                     temp->threadId = pthread_self();
508                     break;
509                 }
510             }
511             temp++;
512         }
513         if (!sem) {
514             return NULL; //shouldn't happen
515         }
516     }
517     else {
518         mvLog(MVLOG_ERROR, "Error: cached semaphores %d exceeds the MAXIMUM_SEMAPHORES %d", curr->semaphores, MAXIMUM_SEMAPHORES);
519         return NULL;
520     }
521
522     return sem;
523 }
524
525 static sem_t* getAndRefSem(pthread_t threadId, xLinkSchedulerState_t *curr, int inc_ref)
526 {
527     XLINK_RET_ERR_IF(curr == NULL, NULL);
528
529     localSem_t* sem = curr->eventSemaphores;
530     while (sem < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
531         if (pthread_t_compare(sem->threadId, threadId) && sem->refs >= 0) {
532             sem->refs += inc_ref;
533             return &sem->sem;
534         }
535         sem++;
536     }
537     return NULL;
538 }
539
540 static int unrefSem(sem_t* sem,  xLinkSchedulerState_t* curr) {
541     ASSERT_XLINK(curr != NULL);
542     localSem_t* temp = curr->eventSemaphores;
543     while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
544         if (&temp->sem == sem) {
545             temp->refs--;
546             return 1;
547         }
548         temp++;
549     }
550     mvLog(MVLOG_WARN,"unrefSem : sem wasn't found\n");
551     return 0;
552 }
553
554 #if (defined(_WIN32) || defined(_WIN64))
555 static void* __cdecl eventReader(void* ctx)
556 #else
557 static void* eventReader(void* ctx)
558 #endif
559 {
560     xLinkSchedulerState_t *curr = (xLinkSchedulerState_t*)ctx;
561     XLINK_RET_ERR_IF(curr == NULL, NULL);
562
563     xLinkEvent_t event = { 0 };// to fix error C4700 in win
564     event.header.id = -1;
565     event.deviceHandle = curr->deviceHandle;
566
567     mvLog(MVLOG_INFO,"eventReader thread started");
568
569     while (!curr->resetXLink) {
570         int sc = glControlFunc->eventReceive(&event);
571
572         mvLog(MVLOG_DEBUG,"Reading %s (scheduler %d, fd %p, event id %d, event stream_id %u, event size %u)\n",
573               TypeToStr(event.header.type), curr->schedulerId, event.deviceHandle.xLinkFD, event.header.id, event.header.streamId, event.header.size);
574
575         if (sc) {
576             mvLog(MVLOG_DEBUG,"Failed to receive event (err %d)", sc);
577             dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
578             dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
579             continue;
580         }
581
582         DispatcherAddEvent(EVENT_REMOTE, &event);
583
584         if (event.header.type == XLINK_RESET_REQ) {
585             curr->resetXLink = 1;
586             mvLog(MVLOG_DEBUG,"Read XLINK_RESET_REQ, stopping eventReader thread.");
587         }
588     }
589
590     return 0;
591 }
592
593 #if (defined(_WIN32) || defined(_WIN64))
594 static void* __cdecl eventSchedulerRun(void* ctx)
595 #else
596 static void* eventSchedulerRun(void* ctx)
597 #endif
598 {
599     int schedulerId = *((int*) ctx);
600     mvLog(MVLOG_DEBUG,"%s() schedulerId %d\n", __func__, schedulerId);
601     XLINK_RET_ERR_IF(schedulerId >= MAX_SCHEDULERS, NULL);
602
603     xLinkSchedulerState_t* curr = &schedulerState[schedulerId];
604     pthread_t readerThreadId;        /* Create thread for reader.
605                         This thread will notify the dispatcher of any incoming packets*/
606     pthread_attr_t attr;
607     int sc;
608     if (pthread_attr_init(&attr) != 0) {
609         mvLog(MVLOG_ERROR,"pthread_attr_init error");
610         return NULL;
611     }
612 #ifndef __PC__
613     if (pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED) != 0) {
614         pthread_attr_destroy(&attr);
615         mvLog(MVLOG_ERROR,"pthread_attr_setinheritsched error");
616         return NULL;
617     }
618     if (pthread_attr_setschedpolicy(&attr, SCHED_RR) != 0) {
619         pthread_attr_destroy(&attr);
620         mvLog(MVLOG_ERROR,"pthread_attr_setschedpolicy error");
621         return NULL;
622     }
623 #endif
624     sc = pthread_create(&readerThreadId, &attr, eventReader, curr);
625     if (sc) {
626         mvLog(MVLOG_ERROR, "Thread creation failed");
627         if (pthread_attr_destroy(&attr) != 0) {
628             perror("Thread attr destroy failed\n");
629         }
630         return NULL;
631     }
632 #ifndef __APPLE__
633     char eventReaderThreadName[MVLOG_MAXIMUM_THREAD_NAME_SIZE];
634     snprintf(eventReaderThreadName, sizeof(eventReaderThreadName), "EventRead%.2dThr", schedulerId);
635     sc = pthread_setname_np(readerThreadId, eventReaderThreadName);
636     if (sc != 0) {
637         perror("Setting name for event reader thread failed");
638     }
639 #endif
640     mvLog(MVLOG_INFO,"Scheduler thread started");
641
642     XLinkError_t rc = sendEvents(curr);
643     if(rc) {
644         mvLog(MVLOG_ERROR, "sendEvents method finished with an error: %s", XLinkErrorToStr(rc));
645     }
646
647     sc = pthread_join(readerThreadId, NULL);
648     if (sc) {
649         mvLog(MVLOG_ERROR, "Waiting for thread failed");
650     }
651
652     sc = pthread_attr_destroy(&attr);
653     if (sc) {
654         mvLog(MVLOG_WARN, "Thread attr destroy failed");
655     }
656
657     if (dispatcherReset(curr) != 0) {
658         mvLog(MVLOG_WARN, "Failed to reset");
659     }
660
661     if (curr->resetXLink != 1) {
662         mvLog(MVLOG_ERROR,"Scheduler thread stopped");
663     } else {
664         mvLog(MVLOG_INFO,"Scheduler thread stopped");
665     }
666
667     return NULL;
668 }
669
670 static int isEventTypeRequest(xLinkEventPriv_t* event)
671 {
672     return event->packet.header.type < XLINK_REQUEST_LAST;
673 }
674
675 static void postAndMarkEventServed(xLinkEventPriv_t *event)
676 {
677     if (event->retEv){
678         // the xLinkEventPriv_t slot pointed by "event" will be
679         // re-cycled as soon as we mark it as EVENT_SERVED,
680         // so before that, we copy the result event into XLink API layer
681         *(event->retEv) = event->packet;
682     }
683     if(event->sem){
684         if (sem_post(event->sem)) {
685             mvLog(MVLOG_ERROR,"can't post semaphore\n");
686         }
687     }
688
689     event->isServed = EVENT_SERVED;
690 }
691
692 static int createUniqueID()
693 {
694     static int id = 0xa;
695     return id++;
696 }
697
698 int findAvailableScheduler()
699 {
700     int i;
701     for (i = 0; i < MAX_SCHEDULERS; i++)
702         if (schedulerState[i].schedulerId == -1)
703             return i;
704     return -1;
705 }
706
707 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD)
708 {
709     int i;
710     if (xLinkFD == NULL) { //in case of myriad there should be one scheduler
711         if (numSchedulers == 1)
712             return &schedulerState[0];
713         else
714             NULL;
715     }
716     for (i=0; i < MAX_SCHEDULERS; i++)
717         if (schedulerState[i].schedulerId != -1 &&
718             schedulerState[i].deviceHandle.xLinkFD == xLinkFD)
719             return &schedulerState[i];
720
721     return NULL;
722 }
723
724 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr){
725     XLINK_RET_IF(curr == NULL);
726     XLINK_RET_IF(!isEventTypeRequest(event));
727     xLinkEventHeader_t *header = &event->packet.header;
728     if (header->flags.bitField.block){ //block is requested
729         event->isServed = EVENT_BLOCKED;
730     } else if(header->flags.bitField.localServe == 1 ||
731               (header->flags.bitField.ack == 0
732                && header->flags.bitField.nack == 1)){ //this event is served locally, or it is failed
733         postAndMarkEventServed(event);
734     }else if (header->flags.bitField.ack == 1
735               && header->flags.bitField.nack == 0){
736         event->isServed = EVENT_PENDING;
737         mvLog(MVLOG_DEBUG,"------------------------UNserved %s\n",
738               TypeToStr(event->packet.header.type));
739     }else{
740         return 1;
741     }
742     return 0;
743 }
744
745 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr)
746 {
747     XLINK_RET_ERR_IF(curr == NULL, 1);
748     XLINK_RET_ERR_IF(isEventTypeRequest(event), 1);
749     int i = 0;
750     for (i = 0; i < MAX_EVENTS; i++)
751     {
752         xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
753         xLinkEventHeader_t *evHeader = &event->packet.header;
754
755         if (curr->lQueue.q[i].isServed == EVENT_PENDING &&
756             header->id == evHeader->id &&
757             header->type == evHeader->type - XLINK_REQUEST_LAST -1)
758         {
759             mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n",
760                   TypeToStr(header->type));
761             //propagate back flags
762             header->flags = evHeader->flags;
763             postAndMarkEventServed(&curr->lQueue.q[i]);
764             break;
765         }
766     }
767     if (i == MAX_EVENTS) {
768         mvLog(MVLOG_FATAL,"no request for this response: %s %d\n", TypeToStr(event->packet.header.type), event->origin);
769         mvLog(MVLOG_DEBUG,"#### (i == MAX_EVENTS) %s %d %d\n", TypeToStr(event->packet.header.type), event->origin, (int)event->packet.header.id);
770         for (i = 0; i < MAX_EVENTS; i++)
771         {
772             xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
773
774             mvLog(MVLOG_DEBUG,"%d) header->id %i, header->type %s(%i), curr->lQueue.q[i].isServed %i, EVENT_PENDING %i\n", i, (int)header->id
775             , TypeToStr(header->type), header->type, curr->lQueue.q[i].isServed, EVENT_PENDING);
776
777         }
778         return 1;
779     }
780     return 0;
781 }
782
783 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
784                                                         xLinkEventPriv_t* start, xLinkEventState_t state){
785     xLinkEventPriv_t* tmp = start;
786     while (start->isServed != state){
787         CIRCULAR_INCREMENT_BASE(start, end, base);
788         if(tmp == start){
789             break;
790         }
791     }
792     if(start->isServed == state){
793         return start;
794     }else{
795         return NULL;
796     }
797 }
798
799 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr)
800 {
801     XLINK_RET_ERR_IF(curr == NULL, NULL);
802     xLinkEventPriv_t* ev = NULL;
803
804     ev = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_READY);
805     if(ev){
806         mvLog(MVLOG_DEBUG,"ready %s %d \n",
807               TypeToStr((int)ev->packet.header.type),
808               (int)ev->packet.header.id);
809     }
810     return ev;
811 }
812
813 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){
814     xLinkEventPriv_t* event = NULL;
815     if (q->cur != q->curProc) {
816         event = getNextElementWithState(q->base, q->end, q->curProc, EVENT_ALLOCATED);
817         q->curProc = event;
818         CIRCULAR_INCREMENT_BASE(q->curProc, q->end, q->base);
819     }
820     return event;
821 }
822
823 /**
824  * @brief Add event to Queue
825  * @note It called from dispatcherAddEvent
826  */
827 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
828                                             eventQueueHandler_t *q, xLinkEvent_t* event,
829                                             sem_t* sem, xLinkEventOrigin_t o){
830     xLinkEvent_t* ev;
831     xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
832     if (eventP == NULL) {
833         mvLog(MVLOG_ERROR, "getNextElementWithState returned NULL");
834         return NULL;
835     }
836     mvLog(MVLOG_DEBUG, "Received event %s %d", TypeToStr(event->header.type), o);
837     ev = &eventP->packet;
838
839     (void)curr;
840     eventP->sem = sem;
841     eventP->packet = *event;
842     eventP->origin = o;
843     if (o == EVENT_LOCAL) {
844         // XLink API caller provided buffer for return the final result to
845         eventP->retEv = event;
846     }else{
847         eventP->retEv = NULL;
848     }
849     q->cur = eventP;
850     eventP->isServed = EVENT_ALLOCATED;
851     CIRCULAR_INCREMENT_BASE(q->cur, q->end, q->base);
852     return ev;
853 }
854
855 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
856 {
857     XLINK_RET_ERR_IF(curr == NULL, NULL);
858
859     if (sem_wait(&curr->notifyDispatcherSem)) {
860         mvLog(MVLOG_ERROR,"can't post semaphore\n");
861     }
862
863     xLinkEventPriv_t* event = NULL;
864     event = searchForReadyEvent(curr);
865     if (event) {
866         return event;
867     }
868
869     eventQueueHandler_t* hPriorityQueue = curr->queueProcPriority ? &curr->lQueue : &curr->rQueue;
870     eventQueueHandler_t* lPriorityQueue = curr->queueProcPriority ? &curr->rQueue : &curr->lQueue;
871     curr->queueProcPriority = curr->queueProcPriority ? 0 : 1;
872
873     event = getNextQueueElemToProc(hPriorityQueue);
874     if (event) {
875         return event;
876     }
877     event = getNextQueueElemToProc(lPriorityQueue);
878
879     return event;
880 }
881
882 static int dispatcherClean(xLinkSchedulerState_t* curr)
883 {
884     XLINK_RET_ERR_IF(pthread_mutex_lock(&clean_mutex), 1);
885     if (curr->schedulerId == -1) {
886         mvLog(MVLOG_WARN,"Scheduler has already been reset or cleaned");
887         if(pthread_mutex_unlock(&clean_mutex) != 0) {
888             mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex");
889         }
890
891         return 1;
892     }
893
894     mvLog(MVLOG_INFO, "Start Clean Dispatcher...");
895
896     if (sem_post(&curr->notifyDispatcherSem)) {
897         mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event
898     }
899     xLinkEventPriv_t* event = dispatcherGetNextEvent(curr);
900     while (event != NULL) {
901         mvLog(MVLOG_INFO, "dropped event is %s, status %d\n",
902               TypeToStr(event->packet.header.type), event->isServed);
903
904         postAndMarkEventServed(event);
905         event = dispatcherGetNextEvent(curr);
906     }
907
908     dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
909     dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
910
911     curr->schedulerId = -1;
912     curr->resetXLink = 1;
913     sem_destroy(&curr->addEventSem);
914     sem_destroy(&curr->notifyDispatcherSem);
915     localSem_t* temp = curr->eventSemaphores;
916     while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
917         // unblock potentially blocked event semaphores
918         sem_post(&temp->sem);
919         sem_destroy(&temp->sem);
920         temp->refs = -1;
921         temp++;
922     }
923     numSchedulers--;
924
925     mvLog(MVLOG_INFO, "Clean Dispatcher Successfully...");
926     if(pthread_mutex_unlock(&clean_mutex) != 0) {
927         mvLog(MVLOG_ERROR, "Failed to unlock clean_mutex after clearing dispatcher");
928     }
929     return 0;
930 }
931
932 static int dispatcherReset(xLinkSchedulerState_t* curr)
933 {
934     ASSERT_XLINK(curr != NULL);
935
936     glControlFunc->closeDeviceFd(&curr->deviceHandle);
937     if(dispatcherClean(curr)) {
938         mvLog(MVLOG_INFO, "Failed to clean dispatcher");
939     }
940
941     xLinkDesc_t* link = getLink(curr->deviceHandle.xLinkFD);
942     if(link == NULL || sem_post(&link->dispatcherClosedSem)) {
943         mvLog(MVLOG_DEBUG,"can't post dispatcherClosedSem\n");
944     }
945
946     glControlFunc->closeLink(curr->deviceHandle.xLinkFD, 1);
947     mvLog(MVLOG_DEBUG,"Reset Successfully\n");
948     return 0;
949 }
950
951 static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
952     int res;
953     xLinkEventPriv_t* event;
954     xLinkEventPriv_t response;
955
956     while (!curr->resetXLink) {
957         event = dispatcherGetNextEvent(curr);
958         if(event == NULL) {
959             mvLog(MVLOG_ERROR,"Dispatcher received NULL event!");
960 #ifdef __PC__
961             break; //Mean that user reset XLink.
962 #else
963             continue;
964 #endif
965         }
966
967         if(event->packet.deviceHandle.xLinkFD
968            != curr->deviceHandle.xLinkFD) {
969             mvLog(MVLOG_FATAL,"The file descriptor mismatch between the event and the scheduler.\n"
970                               "    Event: id=%d, fd=%p"
971                               "    Scheduler fd=%p",
972                               event->packet.header.id, event->packet.deviceHandle.xLinkFD,
973                               curr->deviceHandle.xLinkFD);
974             event->packet.header.flags.bitField.nack = 1;
975             event->packet.header.flags.bitField.ack = 0;
976
977             if (event->origin == EVENT_LOCAL){
978                 dispatcherRequestServe(event, curr);
979             } else {
980                 dispatcherResponseServe(event, curr);
981             }
982
983             continue;
984         }
985
986         getRespFunction getResp;
987         xLinkEvent_t* toSend;
988         if (event->origin == EVENT_LOCAL){
989             getResp = glControlFunc->localGetResponse;
990             toSend = &event->packet;
991         }else{
992             getResp = glControlFunc->remoteGetResponse;
993             toSend = &response.packet;
994         }
995
996         res = getResp(&event->packet, &response.packet);
997         if (isEventTypeRequest(event)){
998             if (event->origin == EVENT_LOCAL){ //we need to do this for locals only
999                 if(dispatcherRequestServe(event, curr)) {
1000                     mvLog(MVLOG_ERROR, "Failed to serve local event. "
1001                                        "Event: id=%d, type=%s, streamId=%u, streamName=%s",
1002                                        event->packet.header.id,  TypeToStr(event->packet.header.type),
1003                                        event->packet.header.streamId, event->packet.header.streamName);
1004                 }
1005             }
1006
1007             if (res == 0 && event->packet.header.flags.bitField.localServe == 0) {
1008 #ifdef __PC__
1009                 if (toSend->header.type == XLINK_RESET_REQ) {
1010                     curr->resetXLink = 1;
1011                     mvLog(MVLOG_DEBUG,"Send XLINK_RESET_REQ, stopping sendEvents thread.");
1012                     if(toSend->deviceHandle.protocol == X_LINK_PCIE) {
1013                         toSend->header.type = XLINK_PING_REQ;
1014                         mvLog(MVLOG_DEBUG, "Request for reboot not sent, only ping event");
1015                     } else {
1016 #if defined(NO_BOOT)
1017                         toSend->header.type = XLINK_PING_REQ;
1018                         mvLog(MVLOG_INFO, "Request for reboot not sent, only ping event");
1019 #endif // defined(NO_BOOT)
1020
1021                     }
1022                 }
1023 #endif // __PC__
1024                 if (glControlFunc->eventSend(toSend) != 0) {
1025                     dispatcherFreeEvents(&curr->lQueue, EVENT_PENDING);
1026                     dispatcherFreeEvents(&curr->lQueue, EVENT_BLOCKED);
1027                     mvLog(MVLOG_ERROR, "Event sending failed");
1028                 }
1029             }
1030         } else {
1031             if (event->origin == EVENT_REMOTE){ // match remote response with the local request
1032                 dispatcherResponseServe(event, curr);
1033             }
1034         }
1035
1036         if (event->origin == EVENT_REMOTE){
1037             event->isServed = EVENT_SERVED;
1038         }
1039     }
1040
1041     return X_LINK_SUCCESS;
1042 }
1043
1044 static void dispatcherFreeEvents(eventQueueHandler_t *queue, xLinkEventState_t state) {
1045     if(queue == NULL) {
1046         return;
1047     }
1048
1049     xLinkEventPriv_t* event = getNextElementWithState(queue->base, queue->end, queue->base, state);
1050     while (event != NULL) {
1051         mvLog(MVLOG_DEBUG, "Event is %s, size is %d, Mark it served\n", TypeToStr(event->packet.header.type), event->packet.header.size);
1052         postAndMarkEventServed(event);
1053         event = getNextElementWithState(queue->base, queue->end, queue->base, state);
1054     }
1055 }
1056
1057
1058 // ------------------------------------
1059 // Helpers implementation. End.
1060 // ------------------------------------