252755e9c7f4756967f74f4668fc3233f395ca1c
[platform/upstream/dldt.git] / inference-engine / thirdparty / movidius / XLink / shared / XLinkDispatcher.c
1 // Copyright (C) 2018-2019 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4
5 ///
6 /// @file
7 ///
8 /// @brief     Application configuration Leon header
9 ///
10
11 #ifndef _GNU_SOURCE
12 #define _GNU_SOURCE // fix for warning: implicit declaration of function ‘pthread_setname_np’
13 #endif
14 #include "stdio.h"
15 #include "stdint.h"
16 #include "stdlib.h"
17 #include "string.h"
18
19 #include <assert.h>
20 #include <stdlib.h>
21 #if (defined(_WIN32) || defined(_WIN64))
22 #include "win_pthread.h"
23 #include "win_semaphore.h"
24 #else
25 #include <pthread.h>
26 #include <semaphore.h>
27 #endif
28 #include "XLinkDispatcher.h"
29 #include "XLinkPrivateDefines.h"
30 #include "XLink.h"
31
32 #define MVLOG_UNIT_NAME xLink
33 #include "mvLog.h"
34
35 typedef enum {
36     EVENT_ALLOCATED,
37     EVENT_PENDING,
38     EVENT_BLOCKED,
39     EVENT_READY,
40     EVENT_SERVED,
41 } xLinkEventState_t;
42
43 typedef struct xLinkEventPriv_t {
44     xLinkEvent_t packet;
45     xLinkEventState_t isServed;
46     xLinkEventOrigin_t origin;
47     sem_t* sem;
48     void* data;
49     xLinkEvent_t * retEv;
50     uint32_t pad;
51 } xLinkEventPriv_t;
52
53 typedef struct {
54     sem_t sem;
55     pthread_t threadId;
56     int refs;
57 } localSem_t;
58
59 typedef struct{
60     xLinkEventPriv_t* end;
61     xLinkEventPriv_t* base;
62
63     xLinkEventPriv_t* curProc;
64     xLinkEventPriv_t* cur;
65     __attribute__((aligned(64))) xLinkEventPriv_t q[MAX_EVENTS];
66
67 }eventQueueHandler_t;
68 /**
69  * @brief Scheduler for each device
70  */
71 typedef struct {
72     xLinkDeviceHandle_t deviceHandle; //will be device handler
73     int schedulerId;
74
75     sem_t addEventSem;
76     sem_t notifyDispatcherSem;
77     volatile uint32_t resetXLink;
78     uint32_t semaphores;
79     pthread_t xLinkThreadId;
80
81     eventQueueHandler_t lQueue; //local queue
82     eventQueueHandler_t rQueue; //remote queue
83     localSem_t eventSemaphores[MAXIMUM_SEMAPHORES];
84 } xLinkSchedulerState_t;
85
86
87 #define CIRCULAR_INCREMENT(x, maxVal, base) \
88     { \
89         x++; \
90         if (x == maxVal) \
91             x = base; \
92     }
93 //avoid problems with unsigned. first compare and then give the nuw value
94 #define CIRCULAR_DECREMENT(x, maxVal, base) \
95 { \
96     if (x == base) \
97         x = maxVal - 1; \
98     else \
99         x--; \
100 }
101
102 extern char* TypeToStr(int type);
103
104 #if (defined(_WIN32) || defined(_WIN64))
105 static void* __cdecl eventSchedulerRun(void* ctx);
106 #else
107 static void* eventSchedulerRun(void*);
108 #endif
109 //These will be common for all, Initialized only once
110 struct dispatcherControlFunctions* glControlFunc;
111 int numSchedulers;
112 xLinkSchedulerState_t schedulerState[MAX_SCHEDULERS];
113 sem_t addSchedulerSem;
114
115 int pthread_t_compare(pthread_t a, pthread_t b)
116 {
117 #if (defined(_WIN32) || defined(_WIN64) )
118         return ((a.tid == b.tid));
119 #else
120     return  (a == b);
121 #endif
122 }
123
124 static int unrefSem(sem_t* sem,  xLinkSchedulerState_t* curr) {
125     ASSERT_X_LINK(curr != NULL);
126     localSem_t* temp = curr->eventSemaphores;
127     while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
128         if (&temp->sem == sem) {
129             temp->refs--;
130             if (temp->refs == 0) {
131                 curr->semaphores--;
132                 ASSERT_X_LINK(sem_destroy(&temp->sem) != -1);
133                 temp->refs = -1;
134             }
135             return 1;
136         }
137         temp++;
138     }
139     mvLog(MVLOG_WARN,"unrefSem : sem wasn't found\n");
140     return 0;
141 }
142 static sem_t* getCurrentSem(pthread_t threadId, xLinkSchedulerState_t* curr, int inc_ref)
143 {
144     ASSERT_X_LINK_R(curr != NULL, NULL);
145
146     localSem_t* sem = curr->eventSemaphores;
147     while (sem < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
148         if (pthread_t_compare(sem->threadId, threadId) && sem->refs > 0) {
149             sem->refs += inc_ref;
150             return &sem->sem;
151         }
152         sem++;
153     }
154     return NULL;
155 }
156
157 static sem_t* createSem(xLinkSchedulerState_t* curr)
158 {
159     ASSERT_X_LINK_R(curr != NULL, NULL);
160
161
162     sem_t* sem = getCurrentSem(pthread_self(), curr, 0);
163     if (sem) // it already exists, error
164         return NULL;
165     else
166     {
167         if (curr->semaphores < MAXIMUM_SEMAPHORES) {
168             localSem_t* temp = curr->eventSemaphores;
169             while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
170                 if (temp->refs < 0) {
171                     sem = &temp->sem;
172                     if (temp->refs == -1) {
173                         if (sem_init(sem, 0, 0))
174                             perror("Can't create semaphore\n");
175                     }
176                     curr->semaphores++;
177                     temp->refs = 1;
178                     temp->threadId = pthread_self();
179
180                     break;
181                 }
182                 temp++;
183             }
184             if (!sem)
185                 return NULL;
186         }
187         else
188             return NULL;
189        return sem;
190     }
191 }
192
193 #if (defined(_WIN32) || defined(_WIN64))
194 static void* __cdecl eventReader(void* ctx)
195 #else
196 static void* eventReader(void* ctx)
197 #endif
198 {
199     xLinkSchedulerState_t *curr = (xLinkSchedulerState_t*)ctx;
200     ASSERT_X_LINK_R(curr, NULL);
201
202     xLinkEvent_t event = { 0 };
203     event.header.id = -1;
204     event.deviceHandle = curr->deviceHandle;
205
206     mvLog(MVLOG_INFO,"eventReader thread started");
207
208     while (!curr->resetXLink) {
209         int sc = glControlFunc->eventReceive(&event);
210         mvLog(MVLOG_DEBUG,"Reading %s (scheduler %d, fd %p, event id %d, event stream_id %u, event size %u)\n",
211             TypeToStr(event.header.type), curr->schedulerId, event.deviceHandle.xLinkFD, event.header.id, event.header.streamId, event.header.size);
212
213         if (event.header.type == XLINK_RESET_RESP) {
214             curr->resetXLink = 1;
215             mvLog(MVLOG_INFO,"eventReader thread stopped: reset");
216             break;
217         }
218
219         if (sc) {
220             if (sem_post(&curr->notifyDispatcherSem)) {
221                 mvLog(MVLOG_ERROR,"can't post semaphore\n"); // stop eventSchedulerRun thread
222             }
223             mvLog(MVLOG_ERROR,"eventReader thread stopped (err %d)", sc);
224             break;
225         }
226     }
227
228     return 0;
229 }
230
231
232
233 static int isEventTypeRequest(xLinkEventPriv_t* event)
234 {
235     if (event->packet.header.type < XLINK_REQUEST_LAST)
236         return 1;
237     else
238         return 0;
239 }
240
241 static void markEventBlocked(xLinkEventPriv_t* event)
242 {
243     event->isServed = EVENT_BLOCKED;
244 }
245
246 static void markEventReady(xLinkEventPriv_t* event)
247 {
248     event->isServed = EVENT_READY;
249 }
250
251 static void markEventServed(xLinkEventPriv_t* event)
252 {
253     if(event->retEv){
254         // the xLinkEventPriv_t slot pointed by "event" will be
255         // re-cycled as soon as we mark it as EVENT_SERVED,
256         // so before that, we copy the result event into XLink API layer
257         *(event->retEv) = event->packet;
258     }
259     if(event->sem){
260         if (sem_post(event->sem)) {
261             mvLog(MVLOG_ERROR,"can't post semaphore\n");
262         }
263     }
264     event->isServed = EVENT_SERVED;
265 }
266
267
268 static int dispatcherRequestServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr){
269     ASSERT_X_LINK(curr != NULL);
270     ASSERT_X_LINK(isEventTypeRequest(event));
271     xLinkEventHeader_t *header = &event->packet.header;
272     if (header->flags.bitField.block){ //block is requested
273         markEventBlocked(event);
274     }else if(header->flags.bitField.localServe == 1 ||
275              (header->flags.bitField.ack == 0
276              && header->flags.bitField.nack == 1)){ //this event is served locally, or it is failed
277         markEventServed(event);
278     }else if (header->flags.bitField.ack == 1
279               && header->flags.bitField.nack == 0){
280         event->isServed = EVENT_PENDING;
281         mvLog(MVLOG_DEBUG,"------------------------UNserved %s\n",
282               TypeToStr(event->packet.header.type));
283     }else{
284         ASSERT_X_LINK(0);
285     }
286     return 0;
287 }
288
289
290 static int dispatcherResponseServe(xLinkEventPriv_t * event, xLinkSchedulerState_t* curr)
291 {
292     int i = 0;
293     ASSERT_X_LINK(curr != NULL);
294     ASSERT_X_LINK(!isEventTypeRequest(event));
295     for (i = 0; i < MAX_EVENTS; i++)
296     {
297         xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
298         xLinkEventHeader_t *evHeader = &event->packet.header;
299
300         if (curr->lQueue.q[i].isServed == EVENT_PENDING &&
301                         header->id == evHeader->id &&
302                         header->type == evHeader->type - XLINK_REQUEST_LAST -1)
303         {
304             mvLog(MVLOG_DEBUG,"----------------------ISserved %s\n",
305                     TypeToStr(header->type));
306             //propagate back flags
307             header->flags = evHeader->flags;
308             markEventServed(&curr->lQueue.q[i]);
309             break;
310         }
311     }
312     if (i == MAX_EVENTS) {
313         mvLog(MVLOG_FATAL,"no request for this response: %s %d %d\n", TypeToStr(event->packet.header.type), event->origin, event->packet.header.id);
314         printf("#### (i == MAX_EVENTS) %s %d %d\n", TypeToStr(event->packet.header.type), event->origin, (int)event->packet.header.id);
315         for (i = 0; i < MAX_EVENTS; i++)
316         {
317             xLinkEventHeader_t *header = &curr->lQueue.q[i].packet.header;
318
319             printf("%d) header->id %i, header->type %s(%i), curr->lQueue.q[i].isServed %i, EVENT_PENDING %i\n", i, (int)header->id
320                      , TypeToStr(header->type), header->type, curr->lQueue.q[i].isServed, EVENT_PENDING);
321
322         }
323         ASSERT_X_LINK(0);
324     }
325     return 0;
326 }
327
328 static inline xLinkEventPriv_t* getNextElementWithState(xLinkEventPriv_t* base, xLinkEventPriv_t* end,
329                                                         xLinkEventPriv_t* start, xLinkEventState_t state){
330     xLinkEventPriv_t* tmp = start;
331     while (start->isServed != state){
332         CIRCULAR_INCREMENT(start, end, base);
333         if(tmp == start){
334             break;
335         }
336     }
337     if(start->isServed == state){
338         return start;
339     }else{
340         return NULL;
341     }
342 }
343
344 static xLinkEventPriv_t* searchForReadyEvent(xLinkSchedulerState_t* curr)
345 {
346     ASSERT_X_LINK_R(curr != NULL, NULL);
347     xLinkEventPriv_t* ev = NULL;
348
349     ev = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_READY);
350     if(ev){
351         mvLog(MVLOG_DEBUG,"ready %s %d \n",
352               TypeToStr((int)ev->packet.header.type),
353               (int)ev->packet.header.id);
354     }
355     return ev;
356 }
357
358 static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){
359     xLinkEventPriv_t* event = NULL;
360     event = getNextElementWithState(q->base, q->end, q->curProc, EVENT_ALLOCATED);
361     if(event != NULL) {
362         q->curProc = event;
363         CIRCULAR_INCREMENT(q->curProc, q->end, q->base);
364     }
365     return event;
366 }
367
368 /**
369  * @brief Add event to Queue
370  * @note It called from dispatcherAddEvent
371  */
372 static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
373                                             eventQueueHandler_t *q, xLinkEvent_t* event,
374                                             sem_t* sem, xLinkEventOrigin_t o){
375     xLinkEvent_t* ev;
376     xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
377     if (eventP == NULL) {
378         mvLog(MVLOG_ERROR, "Can not get next element");
379         return NULL;
380     }
381     mvLog(MVLOG_DEBUG, "Received event %s %d", TypeToStr(event->header.type), o);
382     ev = &eventP->packet;
383     if (eventP->sem) {
384         if ((XLinkError_t)unrefSem(eventP->sem,  curr) == X_LINK_ERROR) {
385             mvLog(MVLOG_WARN, "Failed to unref sem");
386         }
387     }
388
389     eventP->sem = sem;
390     eventP->packet = *event;
391     eventP->origin = o;
392     if (o == EVENT_LOCAL) {
393         // XLink API caller provided buffer for return the final result to
394         eventP->retEv = event;
395     }else{
396         eventP->retEv = NULL;
397     }
398     // Mark eventP as ALLOCATED to prevent it from being allocated again
399     eventP->isServed = EVENT_ALLOCATED;
400     q->cur = eventP;
401     CIRCULAR_INCREMENT(q->cur, q->end, q->base);
402     return ev;
403 }
404
405 static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
406 {
407     ASSERT_X_LINK_R(curr != NULL, NULL);
408
409     xLinkEventPriv_t* event = NULL;
410     event = searchForReadyEvent(curr);
411     if (event) {
412         return event;
413     }
414     if (XLinkWaitSem(&curr->notifyDispatcherSem)) {
415         mvLog(MVLOG_ERROR,"can't post semaphore\n");
416         return NULL;
417     }
418     event = getNextQueueElemToProc(&curr->lQueue);
419     if (event) {
420         return event;
421     }
422     event = getNextQueueElemToProc(&curr->rQueue);
423     return event;
424 }
425
426 static pthread_mutex_t reset_mutex = PTHREAD_MUTEX_INITIALIZER;
427
428 static int isAvailableScheduler(xLinkSchedulerState_t* curr)
429 {
430     if (curr->schedulerId == -1) {
431         mvLog(MVLOG_WARN,"Scheduler has already been reset or cleaned");
432         return 0; // resetted already
433     }
434     return 1;
435 }
436
437 static void closeDeviceFdAndResetScheduler(xLinkSchedulerState_t* curr)
438 {
439
440     mvLog(MVLOG_INFO, "Dispatcher Cleaning...");
441     glControlFunc->closeDeviceFd(&curr->deviceHandle);
442     curr->schedulerId = -1;
443     curr->resetXLink = 1;
444     sem_destroy(&curr->addEventSem);
445     sem_destroy(&curr->notifyDispatcherSem);
446     localSem_t* temp = curr->eventSemaphores;
447     while (temp < curr->eventSemaphores + MAXIMUM_SEMAPHORES) {
448         // unblock potentially blocked event semaphores
449         sem_post(&temp->sem);
450         sem_destroy(&temp->sem);
451         temp->refs = -1;
452         temp++;
453     }
454     numSchedulers--;
455     mvLog(MVLOG_INFO,"Cleaning Successfully\n");
456
457 }
458
459
460 static int dispatcherReset(xLinkSchedulerState_t* curr)
461 {
462     ASSERT_X_LINK(curr != NULL);
463     CHECK_MUTEX_SUCCESS_RC(pthread_mutex_lock(&reset_mutex), 1);
464
465     if(!isAvailableScheduler(curr)) {
466         CHECK_MUTEX_SUCCESS(pthread_mutex_unlock(&reset_mutex));
467         return 1;
468     }
469
470     mvLog(MVLOG_INFO, "Resetting...");
471
472     glControlFunc->closeLink(curr->deviceHandle.xLinkFD);
473
474     //notifyDispatcherSem +1 for NULL event, avoid dispatcher blocking.
475     if (sem_post(&curr->notifyDispatcherSem)) {
476         mvLog(MVLOG_ERROR,"can't post semaphore\n"); //to allow us to get a NULL event
477     }
478
479     xLinkEventPriv_t* event = dispatcherGetNextEvent(curr);
480     while (event != NULL) {
481         mvLog(MVLOG_INFO, "dropped event is %s, status %d\n",
482               TypeToStr(event->packet.header.type), event->isServed);
483         // although there is no no execution for this event, also mark it as being served without success
484         // caller will be informed and internal event memory slot will be de-allocated
485         markEventServed(event);
486         event = dispatcherGetNextEvent(curr);
487     }
488
489     event = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_PENDING);
490     while (event != NULL) {
491         mvLog(MVLOG_INFO,"Pending event is %s, size is %d, Mark it served\n", TypeToStr(event->packet.header.type), event->packet.header.size);
492         markEventServed(event);
493         event = getNextElementWithState(curr->lQueue.base, curr->lQueue.end, curr->lQueue.base, EVENT_PENDING);
494     }
495     closeDeviceFdAndResetScheduler(curr);
496     CHECK_MUTEX_SUCCESS(pthread_mutex_unlock(&reset_mutex));
497     return 0;
498 }
499 #if (defined(_WIN32) || defined(_WIN64))
500 static void* __cdecl eventSchedulerRun(void* ctx)
501 #else
502 static void* eventSchedulerRun(void* ctx)
503 #endif
504 {
505     int schedulerId = *((int*) ctx);
506     mvLog(MVLOG_DEBUG,"%s() schedulerId %d\n", __func__, schedulerId);
507     ASSERT_X_LINK_R(schedulerId < MAX_SCHEDULERS, NULL);
508
509     xLinkSchedulerState_t* curr = &schedulerState[schedulerId];
510     pthread_t readerThreadId;        /* Create thread for reader.
511                         This thread will notify the dispatcher of any incoming packets*/
512     pthread_attr_t attr;
513     int sc;
514     int res;
515     if (pthread_attr_init(&attr) !=0) {
516         mvLog(MVLOG_ERROR,"pthread_attr_init error");
517         return NULL;
518     }
519
520     sc = pthread_create(&readerThreadId, &attr, eventReader, curr);
521     if (sc) {
522         mvLog(MVLOG_ERROR, "Thread creation failed");
523         if (pthread_attr_destroy(&attr) != 0) {
524             perror("Thread attr destroy failed\n");
525         }
526         return NULL;
527     }
528     char eventReaderThreadName[20];
529     snprintf(eventReaderThreadName, sizeof(eventReaderThreadName), "EventRead%.2dThr", schedulerId);
530     sc = pthread_setname_np(readerThreadId, eventReaderThreadName);
531     if (sc != 0) {
532         perror("Setting name for event reader thread failed");
533     }
534     sc = pthread_attr_destroy(&attr);
535     if (sc) {
536         mvLog(MVLOG_WARN, "Thread attr destroy failed");
537     }
538     xLinkEventPriv_t* event;
539     xLinkEventPriv_t response;
540
541     mvLog(MVLOG_INFO,"Scheduler thread started");
542
543     while (!curr->resetXLink) {
544         event = dispatcherGetNextEvent(curr);
545         if (event == NULL) {
546             break;
547         }
548
549         ASSERT_X_LINK_R(event->packet.deviceHandle.xLinkFD == curr->deviceHandle.xLinkFD, NULL);
550         getRespFunction getResp;
551         xLinkEvent_t* toSend;
552
553         if (event->origin == EVENT_LOCAL){
554             getResp = glControlFunc->localGetResponse;
555             toSend = &event->packet;
556         }else{
557             getResp = glControlFunc->remoteGetResponse;
558             toSend = &response.packet;
559         }
560
561         res = getResp(&event->packet, &response.packet);
562         if (isEventTypeRequest(event)){
563             if (event->origin == EVENT_LOCAL){ //we need to do this for locals only
564                 dispatcherRequestServe(event, curr);
565             }
566             // For PCIE and in with Connect to booted option don't send reset request
567
568             if (res == 0 && event->packet.header.flags.bitField.localServe == 0){
569                 // FIXME We shouldn't send reset request for PCIE and with turned on "NO_BOOT" cmake option
570                 //  Also, we can't just close evenReader thread, as WinPthread don't have suitable function for this emergency exit,
571                 //  so, let's pretend that would be ping request, and then we can correctly close eventReader thread
572
573                 if (toSend->header.type == XLINK_RESET_REQ) {
574                     if(toSend->deviceHandle.protocol == X_LINK_PCIE) {
575                         toSend->header.type = XLINK_PING_REQ;
576                         curr->resetXLink = 1;
577                         mvLog(MVLOG_INFO, "Request for reboot not sent");
578                     } else {
579 #if defined(NO_BOOT)
580                         toSend->header.type = XLINK_PING_REQ;
581                         curr->resetXLink = 1;
582                         mvLog(MVLOG_INFO, "Request for reboot not sent");
583 #endif
584                     }
585                 }
586
587                 if (glControlFunc->eventSend(toSend) != 0) {
588                     mvLog(MVLOG_ERROR, "Event sending failed");
589                 }
590             }
591         } else {
592             if (event->origin == EVENT_REMOTE){ // match remote response with the local request
593                 dispatcherResponseServe(event, curr);
594             }
595         }
596
597         //TODO: dispatcher shouldn't know about this packet. Seems to be easily move-able to protocol
598         if (event->packet.header.type == XLINK_RESET_REQ) {
599             curr->resetXLink = 1;
600         }
601
602         // remote event is served in one round
603         if (event->origin == EVENT_REMOTE){
604             event->isServed = EVENT_SERVED;
605         }
606     }
607
608     sc = pthread_join(readerThreadId, NULL);
609     if (sc) {
610         mvLog(MVLOG_ERROR, "Waiting for thread failed");
611     }
612
613     if (dispatcherReset(curr) != 0) {
614         mvLog(MVLOG_WARN, "Failed to reset");
615     }
616
617     if (curr->resetXLink != 1) {
618         mvLog(MVLOG_ERROR,"Scheduler thread stopped");
619     } else {
620         mvLog(MVLOG_INFO,"Scheduler thread stopped");
621     }
622
623     return NULL;
624 }
625
626 static int createUniqueID()
627 {
628     static int id = 0xa;
629     return id++;
630 }
631
632 static xLinkSchedulerState_t* findCorrespondingScheduler(void* xLinkFD)
633 {
634     int i;
635     if (xLinkFD == NULL) { //in case of myriad there should be one scheduler
636         if (numSchedulers == 1)
637             return &schedulerState[0];
638         else
639             NULL;
640     }
641     for (i=0; i < MAX_SCHEDULERS; i++)
642         if (schedulerState[i].schedulerId != -1 &&
643             schedulerState[i].deviceHandle.xLinkFD == xLinkFD)
644             return &schedulerState[i];
645
646     return NULL;
647 }
648 ///////////////// External Interface //////////////////////////
649 /*Adds a new event with parameters and returns event id*/
650 xLinkEvent_t* dispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
651 {
652     xLinkSchedulerState_t* curr = findCorrespondingScheduler(event->deviceHandle.xLinkFD);
653     ASSERT_X_LINK_R(curr != NULL, NULL);
654
655     if(curr->resetXLink) {
656         return NULL;
657     }
658
659     mvLog(MVLOG_DEBUG, "Receiving event %s %d\n", TypeToStr(event->header.type), origin);
660     if (XLinkWaitSem(&curr->addEventSem)) {
661         mvLog(MVLOG_ERROR,"can't wait semaphore\n");
662         return NULL;
663     }
664
665     sem_t *sem = NULL;
666     xLinkEvent_t* ev;
667     if (origin == EVENT_LOCAL) {
668         event->header.id = createUniqueID();
669         sem = getCurrentSem(pthread_self(), curr, 1);
670         if (!sem) {
671             sem = createSem(curr);
672         }
673         if (!sem) {
674             mvLog(MVLOG_WARN,"No more semaphores. Increase XLink or OS resources\n");
675             if (sem_post(&curr->addEventSem)) {
676                 mvLog(MVLOG_ERROR,"can't post semaphore\n");
677             }
678             return NULL;
679         }
680         event->header.flags.raw = 0;
681         event->header.flags.bitField.ack = 1;
682         ev = addNextQueueElemToProc(curr, &curr->lQueue, event, sem, origin);
683     } else {
684         ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
685     }
686     if (sem_post(&curr->addEventSem)) {
687         mvLog(MVLOG_ERROR,"can't post semaphore\n");
688     }
689     if (sem_post(&curr->notifyDispatcherSem)) {
690         mvLog(MVLOG_ERROR, "can't post semaphore\n");
691     }
692     return ev;
693 }
694
695 int dispatcherWaitEventComplete(xLinkDeviceHandle_t* deviceHandle, unsigned int timeout)
696 {
697     xLinkSchedulerState_t* curr = findCorrespondingScheduler(deviceHandle->xLinkFD);
698     ASSERT_X_LINK(curr != NULL);
699
700     sem_t* id = getCurrentSem(pthread_self(), curr, 0);
701     if (id == NULL) {
702         return -1;
703     }
704
705     int rc = XLinkWaitSemUserMode(id, timeout);
706     if (rc && deviceHandle->protocol != X_LINK_PCIE) {
707         xLinkEvent_t event = {0};
708         event.header.type = XLINK_RESET_REQ;
709         event.deviceHandle = *deviceHandle;
710         mvLog(MVLOG_ERROR,"waiting is timeout, sending reset remote event");
711         dispatcherAddEvent(EVENT_LOCAL, &event);
712         id = getCurrentSem(pthread_self(), curr, 0);
713         if (id == NULL || XLinkWaitSemUserMode(id, timeout)) {
714             dispatcherReset(curr);
715         }
716     }
717
718     return rc;
719 }
720
721 int dispatcherUnblockEvent(eventId_t id, xLinkEventType_t type, streamId_t stream, void* xLinkFD)
722 {
723     xLinkSchedulerState_t* curr = findCorrespondingScheduler(xLinkFD);
724     ASSERT_X_LINK(curr != NULL);
725
726     mvLog(MVLOG_DEBUG,"unblock\n");
727     xLinkEventPriv_t* blockedEvent;
728     for (blockedEvent = curr->lQueue.q;
729          blockedEvent < curr->lQueue.q + MAX_EVENTS;
730          blockedEvent++)
731     {
732         if (blockedEvent->isServed == EVENT_BLOCKED &&
733             ((blockedEvent->packet.header.id == id || id == -1)
734             && blockedEvent->packet.header.type == type
735             && blockedEvent->packet.header.streamId == stream))
736         {
737             mvLog(MVLOG_DEBUG,"unblocked**************** %d %s\n",
738                   (int)blockedEvent->packet.header.id,
739                   TypeToStr((int)blockedEvent->packet.header.type));
740             markEventReady(blockedEvent);
741             return 1;
742         } else {
743             mvLog(MVLOG_DEBUG,"%d %s\n",
744                   (int)blockedEvent->packet.header.id,
745                   TypeToStr((int)blockedEvent->packet.header.type));
746         }
747     }
748     return 0;
749 }
750
751 int findAvailableScheduler()
752 {
753     int i;
754     for (i = 0; i < MAX_SCHEDULERS; i++)
755         if (schedulerState[i].schedulerId == -1)
756             return i;
757     return -1;
758 }
759
760 /**
761  * Initialize scheduler for device
762  */
763 int dispatcherStart(xLinkDeviceHandle_t* deviceHandle)
764 {
765     if (deviceHandle->xLinkFD == NULL) {
766         mvLog(MVLOG_ERROR, "Invalid device filedescriptor");
767         return -1;
768     }
769
770     pthread_attr_t attr;
771     int eventIdx;
772     if (numSchedulers >= MAX_SCHEDULERS)
773     {
774         mvLog(MVLOG_ERROR,"Max number Schedulers reached!\n");
775         return -1;
776     }
777
778     int idx = findAvailableScheduler();
779     if (idx < 0) {
780         mvLog(MVLOG_ERROR,"Available sheduler not found");
781         return -1;
782     }
783
784     memset(&schedulerState[idx], 0, sizeof(xLinkSchedulerState_t));
785
786     schedulerState[idx].semaphores = 0;
787
788     schedulerState[idx].resetXLink = 0;
789     schedulerState[idx].deviceHandle = *deviceHandle;
790     schedulerState[idx].schedulerId = idx;
791
792     schedulerState[idx].lQueue.cur = schedulerState[idx].lQueue.q;
793     schedulerState[idx].lQueue.curProc = schedulerState[idx].lQueue.q;
794     schedulerState[idx].lQueue.base = schedulerState[idx].lQueue.q;
795     schedulerState[idx].lQueue.end = &schedulerState[idx].lQueue.q[MAX_EVENTS];
796
797     schedulerState[idx].rQueue.cur = schedulerState[idx].rQueue.q;
798     schedulerState[idx].rQueue.curProc = schedulerState[idx].rQueue.q;
799     schedulerState[idx].rQueue.base = schedulerState[idx].rQueue.q;
800     schedulerState[idx].rQueue.end = &schedulerState[idx].rQueue.q[MAX_EVENTS];
801
802     for (eventIdx = 0 ; eventIdx < MAX_EVENTS; eventIdx++)
803     {
804         schedulerState[idx].rQueue.q[eventIdx].isServed = EVENT_SERVED;
805         schedulerState[idx].lQueue.q[eventIdx].isServed = EVENT_SERVED;
806     }
807
808     if (sem_init(&schedulerState[idx].addEventSem, 0, 1)) {
809         perror("Can't create semaphore\n");
810         return -1;
811     }
812     if (sem_init(&schedulerState[idx].notifyDispatcherSem, 0, 0)) {
813         perror("Can't create semaphore\n");
814         return -1;
815     }
816     localSem_t* temp = schedulerState[idx].eventSemaphores;
817     while (temp < schedulerState[idx].eventSemaphores + MAXIMUM_SEMAPHORES) {
818         temp->refs = -1;
819         temp++;
820     }
821     if (pthread_attr_init(&attr) != 0) {
822         mvLog(MVLOG_ERROR,"pthread_attr_init error");
823         return -1;
824     }
825
826     XLinkWaitSem(&addSchedulerSem);
827     mvLog(MVLOG_DEBUG,"%s() starting a new thread - schedulerId %d \n", __func__, idx);
828     int sc = pthread_create(&schedulerState[idx].xLinkThreadId,
829                             &attr,
830                             eventSchedulerRun,
831                             (void*)&schedulerState[idx].schedulerId);
832     if (sc) {
833         mvLog(MVLOG_ERROR,"Thread creation failed with error: %d", sc);
834         if (pthread_attr_destroy(&attr) != 0) {
835             perror("Thread attr destroy failed\n");
836         }
837         return -1;
838     }
839
840     char schedulerThreadName[20];
841     snprintf(schedulerThreadName, sizeof(schedulerThreadName), "Scheduler%.2dThr", schedulerState[idx].schedulerId);
842     sc = pthread_setname_np(schedulerState[idx].xLinkThreadId, schedulerThreadName);
843     if (sc != 0) {
844         perror("Setting name for indexed scheduler thread failed");
845     }
846
847     pthread_detach(schedulerState[idx].xLinkThreadId);
848     numSchedulers++;
849
850     sc = pthread_attr_destroy(&attr);
851     if (sc) {
852         perror("Thread attr destroy failed");
853     }
854
855     sem_post(&addSchedulerSem);
856
857     return 0;
858 }
859
860 /**
861  * @brief Initialize dispatcher functions and reset all schedulers
862  */
863 int dispatcherInitialize(struct dispatcherControlFunctions* controlFunc) {
864     int i;
865     if (!controlFunc ||
866         !controlFunc->eventReceive ||
867         !controlFunc->eventSend ||
868         !controlFunc->localGetResponse ||
869         !controlFunc->remoteGetResponse)
870     {
871         return -1;
872     }
873
874     glControlFunc = controlFunc;
875     if (sem_init(&addSchedulerSem, 0, 1)) {
876         perror("Can't create semaphore\n");
877     }
878     numSchedulers = 0;
879     for (i = 0; i < MAX_SCHEDULERS; i++){
880         schedulerState[i].schedulerId = -1;
881     }
882     return 0;
883 }
884
885 int dispatcherClean(void* xLinkFD)
886 {
887     xLinkSchedulerState_t* curr = findCorrespondingScheduler(xLinkFD);
888     ASSERT_X_LINK(curr != NULL);
889
890     CHECK_MUTEX_SUCCESS_RC(pthread_mutex_lock(&reset_mutex), 1);
891     if(!isAvailableScheduler(curr)) {
892         CHECK_MUTEX_SUCCESS(pthread_mutex_unlock(&reset_mutex));
893         return 1;
894     }
895     mvLog(MVLOG_INFO, "Start Clean Dispatcher...");
896     closeDeviceFdAndResetScheduler(curr);
897     mvLog(MVLOG_INFO, "Clean Dispatcher Successfully...");
898     CHECK_MUTEX_SUCCESS(pthread_mutex_unlock(&reset_mutex));
899     return 0;
900 }
901
902
903
904 /* end of file */