3c2d7910889ad71289372cc269c7ef1c1e29bec5
[platform/upstream/dldt.git] / inference-engine / thirdparty / movidius / XLink / shared / src / XLinkDispatcherImpl.c
1 // Copyright (C) 2018-2020 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
3 //
4
5 #include <string.h>
6 #include "stdlib.h"
7
8 #include "XLinkMacros.h"
9 #include "XLinkErrorUtils.h"
10 #include "XLinkPlatform.h"
11 #include "XLinkDispatcherImpl.h"
12 #include "XLinkPrivateFields.h"
13
14 #ifdef MVLOG_UNIT_NAME
15 #undef MVLOG_UNIT_NAME
16 #define MVLOG_UNIT_NAME xLink
17 #endif
18 #include "XLinkLog.h"
19 #include "XLinkStringUtils.h"
20
21 // ------------------------------------
22 // Helpers declaration. Begin.
23 // ------------------------------------
24
25 static int isStreamSpaceEnoughFor(streamDesc_t* stream, uint32_t size);
26
27 static streamPacketDesc_t* getPacketFromStream(streamDesc_t* stream);
28 static int releasePacketFromStream(streamDesc_t* stream, uint32_t* releasedSize);
29 static int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size);
30
31 static int handleIncomingEvent(xLinkEvent_t* event);
32
33 // ------------------------------------
34 // Helpers declaration. End.
35 // ------------------------------------
36
37
38
39 // ------------------------------------
40 // XLinkDispatcherImpl.h implementation. Begin.
41 // ------------------------------------
42
43 //adds a new event with parameters and returns event id
44 int dispatcherEventSend(xLinkEvent_t *event)
45 {
46     mvLog(MVLOG_DEBUG, "Send event: %s, size %d, streamId %ld.\n",
47         TypeToStr(event->header.type), event->header.size, event->header.streamId);
48
49     int rc = XLinkPlatformWrite(&event->deviceHandle,
50         &event->header, sizeof(event->header));
51
52     if(rc < 0) {
53         mvLog(MVLOG_ERROR,"Write failed (header) (err %d) | event %s\n", rc, TypeToStr(event->header.type));
54         return rc;
55     }
56
57     if (event->header.type == XLINK_WRITE_REQ) {
58         rc = XLinkPlatformWrite(&event->deviceHandle,
59             event->data, event->header.size);
60         if(rc < 0) {
61             mvLog(MVLOG_ERROR,"Write failed %d\n", rc);
62             return rc;
63         }
64     }
65
66     return 0;
67 }
68
69 int dispatcherEventReceive(xLinkEvent_t* event){
70     static xLinkEvent_t prevEvent = {0};
71     int rc = XLinkPlatformRead(&event->deviceHandle,
72         &event->header, sizeof(event->header));
73
74     mvLog(MVLOG_DEBUG,"Incoming event %p: %s %d %p prevEvent: %s %d %p\n",
75           event,
76           TypeToStr(event->header.type),
77           (int)event->header.id,
78           event->deviceHandle.xLinkFD,
79           TypeToStr(prevEvent.header.type),
80           (int)prevEvent.header.id,
81           prevEvent.deviceHandle.xLinkFD);
82
83     if(rc < 0) {
84         mvLog(MVLOG_DEBUG,"%s() Read failed %d\n", __func__, (int)rc);
85         return rc;
86     }
87
88     if (prevEvent.header.id == event->header.id &&
89         prevEvent.header.type == event->header.type &&
90         prevEvent.deviceHandle.xLinkFD == event->deviceHandle.xLinkFD) {
91         mvLog(MVLOG_FATAL,"Duplicate id detected. \n");
92     }
93
94     prevEvent = *event;
95     return handleIncomingEvent(event);
96 }
97
98 //this function should be called only for remote requests
99 int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
100 {
101     streamDesc_t* stream;
102     response->header.id = event->header.id;
103     mvLog(MVLOG_DEBUG, "%s\n",TypeToStr(event->header.type));
104     switch (event->header.type){
105         case XLINK_WRITE_REQ:
106         {
107
108             //in case local tries to write after it issues close (writeSize is zero)
109             stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
110
111             if(!stream) {
112                 mvLog(MVLOG_DEBUG, "stream %d has been closed!\n", event->header.streamId);
113                 XLINK_SET_EVENT_FAILED_AND_SERVE(event);
114                 break;
115             }
116
117             if (stream->writeSize == 0)
118             {
119                 XLINK_EVENT_NOT_ACKNOWLEDGE(event);
120                 // return -1 to don't even send it to the remote
121                 releaseStream(stream);
122                 return -1;
123             }
124             XLINK_EVENT_ACKNOWLEDGE(event);
125             event->header.flags.bitField.localServe = 0;
126
127             if(!isStreamSpaceEnoughFor(stream, event->header.size)){
128                 mvLog(MVLOG_DEBUG,"local NACK RTS. stream '%s' is full (event %d)\n", stream->name, event->header.id);
129                 event->header.flags.bitField.block = 1;
130                 event->header.flags.bitField.localServe = 1;
131                 // TODO: easy to implement non-blocking read here, just return nack
132                 mvLog(MVLOG_WARN, "Blocked event would cause dispatching thread to wait on semaphore infinitely\n");
133             }else{
134                 event->header.flags.bitField.block = 0;
135                 stream->remoteFillLevel += event->header.size;
136                 stream->remoteFillPacketLevel++;
137                 mvLog(MVLOG_DEBUG,"S%d: Got local write of %ld , remote fill level %ld out of %ld %ld\n",
138                       event->header.streamId, event->header.size, stream->remoteFillLevel, stream->writeSize, stream->readSize);
139             }
140             releaseStream(stream);
141             break;
142         }
143         case XLINK_READ_REQ:
144         {
145             stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
146             if(!stream) {
147                 mvLog(MVLOG_DEBUG, "stream %d has been closed!\n", event->header.streamId);
148                 XLINK_SET_EVENT_FAILED_AND_SERVE(event);
149                 break;
150             }
151             streamPacketDesc_t* packet = getPacketFromStream(stream);
152             if (packet){
153                 //the read can be served with this packet
154                 event->data = packet;
155                 XLINK_EVENT_ACKNOWLEDGE(event);
156                 event->header.flags.bitField.block = 0;
157             }
158             else{
159                 event->header.flags.bitField.block = 1;
160                 // TODO: easy to implement non-blocking read here, just return nack
161             }
162             event->header.flags.bitField.localServe = 1;
163             releaseStream(stream);
164             break;
165         }
166         case XLINK_READ_REL_REQ:
167         {
168             stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
169             ASSERT_XLINK(stream);
170             XLINK_EVENT_ACKNOWLEDGE(event);
171             uint32_t releasedSize = 0;
172             releasePacketFromStream(stream, &releasedSize);
173             event->header.size = releasedSize;
174             releaseStream(stream);
175             break;
176         }
177         case XLINK_CREATE_STREAM_REQ:
178         {
179             XLINK_EVENT_ACKNOWLEDGE(event);
180             mvLog(MVLOG_DEBUG,"XLINK_CREATE_STREAM_REQ - do nothing\n");
181             break;
182         }
183         case XLINK_CLOSE_STREAM_REQ:
184         {
185             stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
186
187             ASSERT_XLINK(stream);
188             XLINK_EVENT_ACKNOWLEDGE(event);
189             if (stream->remoteFillLevel != 0){
190                 stream->closeStreamInitiated = 1;
191                 event->header.flags.bitField.block = 1;
192                 event->header.flags.bitField.localServe = 1;
193             }else{
194                 event->header.flags.bitField.block = 0;
195                 event->header.flags.bitField.localServe = 0;
196             }
197             releaseStream(stream);
198             break;
199         }
200         case XLINK_RESET_REQ:
201         {
202             XLINK_EVENT_ACKNOWLEDGE(event);
203             mvLog(MVLOG_DEBUG,"XLINK_RESET_REQ - do nothing\n");
204             break;
205         }
206         case XLINK_PING_REQ:
207         {
208             XLINK_EVENT_ACKNOWLEDGE(event);
209             mvLog(MVLOG_DEBUG,"XLINK_PING_REQ - do nothing\n");
210             break;
211         }
212         case XLINK_WRITE_RESP:
213         case XLINK_READ_RESP:
214         case XLINK_READ_REL_RESP:
215         case XLINK_CREATE_STREAM_RESP:
216         case XLINK_CLOSE_STREAM_RESP:
217         case XLINK_PING_RESP:
218             break;
219         case XLINK_RESET_RESP:
220             //should not happen
221             event->header.flags.bitField.localServe = 1;
222             break;
223         default:
224         {
225             mvLog(MVLOG_ERROR,
226                   "Fail to get response for local event. type: %d, stream name: %s\n",
227                   event->header.type, event->header.streamName);
228             ASSERT_XLINK(0);
229         }
230     }
231     return 0;
232 }
233
234 //this function should be called only for remote requests
235 int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
236 {
237     streamDesc_t* stream;
238     response->header.id = event->header.id;
239     response->header.flags.raw = 0;
240     mvLog(MVLOG_DEBUG, "%s\n",TypeToStr(event->header.type));
241
242     switch (event->header.type)
243     {
244         case XLINK_WRITE_REQ:
245             {
246                 //let remote write immediately as we have a local buffer for the data
247                 response->header.type = XLINK_WRITE_RESP;
248                 response->header.size = event->header.size;
249                 response->header.streamId = event->header.streamId;
250                 response->deviceHandle = event->deviceHandle;
251                 XLINK_EVENT_ACKNOWLEDGE(response);
252
253
254                 // we got some data. We should unblock a blocked read
255                 int xxx = DispatcherUnblockEvent(-1,
256                                                  XLINK_READ_REQ,
257                                                  response->header.streamId,
258                                                  event->deviceHandle.xLinkFD);
259                 (void) xxx;
260                 mvLog(MVLOG_DEBUG,"unblocked from stream %d %d\n",
261                       (int)response->header.streamId, (int)xxx);
262             }
263             break;
264         case XLINK_READ_REQ:
265             break;
266         case XLINK_READ_REL_REQ:
267             XLINK_EVENT_ACKNOWLEDGE(response);
268             response->header.type = XLINK_READ_REL_RESP;
269             response->deviceHandle = event->deviceHandle;
270             stream = getStreamById(event->deviceHandle.xLinkFD,
271                                    event->header.streamId);
272             ASSERT_XLINK(stream);
273             stream->remoteFillLevel -= event->header.size;
274             stream->remoteFillPacketLevel--;
275
276             mvLog(MVLOG_DEBUG,"S%d: Got remote release of %ld, remote fill level %ld out of %ld %ld\n",
277                   event->header.streamId, event->header.size, stream->remoteFillLevel, stream->writeSize, stream->readSize);
278             releaseStream(stream);
279
280             DispatcherUnblockEvent(-1, XLINK_WRITE_REQ, event->header.streamId,
281                                    event->deviceHandle.xLinkFD);
282             //with every released packet check if the stream is already marked for close
283             if (stream->closeStreamInitiated && stream->localFillLevel == 0)
284             {
285                 mvLog(MVLOG_DEBUG,"%s() Unblock close STREAM\n", __func__);
286                 int xxx = DispatcherUnblockEvent(-1,
287                                                  XLINK_CLOSE_STREAM_REQ,
288                                                  event->header.streamId,
289                                                  event->deviceHandle.xLinkFD);
290                 (void) xxx;
291             }
292             break;
293         case XLINK_CREATE_STREAM_REQ:
294             XLINK_EVENT_ACKNOWLEDGE(response);
295             response->header.type = XLINK_CREATE_STREAM_RESP;
296             //write size from remote means read size for this peer
297             response->header.streamId = XLinkAddOrUpdateStream(event->deviceHandle.xLinkFD,
298                                                                event->header.streamName,
299                                                                0, event->header.size,
300                                                                INVALID_STREAM_ID);
301
302             if (response->header.streamId == INVALID_STREAM_ID) {
303                 response->header.flags.bitField.ack = 0;
304                 response->header.flags.bitField.sizeTooBig = 1;
305                 break;
306             }
307
308             response->deviceHandle = event->deviceHandle;
309             mv_strncpy(response->header.streamName, MAX_STREAM_NAME_LENGTH,
310                        event->header.streamName, MAX_STREAM_NAME_LENGTH - 1);
311             response->header.size = event->header.size;
312             mvLog(MVLOG_DEBUG,"creating stream %x\n", (int)response->header.streamId);
313             break;
314         case XLINK_CLOSE_STREAM_REQ:
315         {
316             response->header.type = XLINK_CLOSE_STREAM_RESP;
317             response->header.streamId = event->header.streamId;
318             response->deviceHandle = event->deviceHandle;
319
320             streamDesc_t* stream = getStreamById(event->deviceHandle.xLinkFD,
321                                                  event->header.streamId);
322             if (!stream) {
323                 //if we have sent a NACK before, when the event gets unblocked
324                 //the stream might already be unavailable
325                 XLINK_EVENT_ACKNOWLEDGE(response);
326                 mvLog(MVLOG_DEBUG,"%s() got a close stream on aready closed stream\n", __func__);
327             } else {
328                 if (stream->localFillLevel == 0)
329                 {
330                     XLINK_EVENT_ACKNOWLEDGE(response);
331
332                     if (stream->readSize)
333                     {
334                         stream->readSize = 0;
335                         stream->closeStreamInitiated = 0;
336                     }
337
338                     if (!stream->writeSize) {
339                         stream->id = INVALID_STREAM_ID;
340                         stream->name[0] = '\0';
341                     }
342 #ifndef __PC__
343                     if(sem_destroy(&stream->sem))
344                         perror("Can't destroy semaphore");
345 #endif
346                 }
347                 else
348                 {
349                     mvLog(MVLOG_DEBUG,"%s():fifo is NOT empty returning NACK \n", __func__);
350                     XLINK_EVENT_NOT_ACKNOWLEDGE(response);
351                     stream->closeStreamInitiated = 1;
352                 }
353
354                 releaseStream(stream);
355             }
356             break;
357         }
358         case XLINK_PING_REQ:
359             response->header.type = XLINK_PING_RESP;
360             XLINK_EVENT_ACKNOWLEDGE(response);
361             response->deviceHandle = event->deviceHandle;
362             sem_post(&pingSem);
363             break;
364         case XLINK_RESET_REQ:
365             mvLog(MVLOG_DEBUG,"reset request - received! Sending ACK *****\n");
366             XLINK_EVENT_ACKNOWLEDGE(response);
367             response->header.type = XLINK_RESET_RESP;
368             response->deviceHandle = event->deviceHandle;
369             // need to send the response, serve the event and then reset
370             break;
371         case XLINK_WRITE_RESP:
372             break;
373         case XLINK_READ_RESP:
374             break;
375         case XLINK_READ_REL_RESP:
376             break;
377         case XLINK_CREATE_STREAM_RESP:
378         {
379             // write_size from the response the size of the buffer from the remote
380             response->header.streamId = XLinkAddOrUpdateStream(event->deviceHandle.xLinkFD,
381                                                                event->header.streamName,
382                                                                event->header.size, 0,
383                                                                event->header.streamId);
384             XLINK_RET_IF(response->header.streamId
385                 == INVALID_STREAM_ID);
386             response->deviceHandle = event->deviceHandle;
387             break;
388         }
389         case XLINK_CLOSE_STREAM_RESP:
390         {
391             streamDesc_t* stream = getStreamById(event->deviceHandle.xLinkFD,
392                                                  event->header.streamId);
393
394             if (!stream){
395                 XLINK_EVENT_NOT_ACKNOWLEDGE(response);
396                 break;
397             }
398             stream->writeSize = 0;
399             if (!stream->readSize) {
400                 XLINK_EVENT_NOT_ACKNOWLEDGE(response);
401                 stream->id = INVALID_STREAM_ID;
402                 stream->name[0] = '\0';
403                 break;
404             }
405             releaseStream(stream);
406             break;
407         }
408         case XLINK_PING_RESP:
409             break;
410         case XLINK_RESET_RESP:
411             break;
412         default:
413         {
414             mvLog(MVLOG_ERROR,
415                 "Fail to get response for remote event. type: %d, stream name: %s\n",
416                 event->header.type, event->header.streamName);
417             ASSERT_XLINK(0);
418         }
419     }
420     return 0;
421 }
422
423 void dispatcherCloseLink(void* fd, int fullClose)
424 {
425     xLinkDesc_t* link = getLink(fd);
426
427     if (!link) {
428         mvLog(MVLOG_WARN, "Dispatcher link is null");
429         return;
430     }
431
432     if (!fullClose) {
433         link->peerState = XLINK_DOWN;
434         return;
435     }
436
437     link->id = INVALID_LINK_ID;
438     link->deviceHandle.xLinkFD = NULL;
439     link->peerState = XLINK_NOT_INIT;
440     link->nextUniqueStreamId = 0;
441
442     for (int index = 0; index < XLINK_MAX_STREAMS; index++) {
443         streamDesc_t* stream = &link->availableStreams[index];
444         if (!stream) {
445             continue;
446         }
447
448         while (getPacketFromStream(stream) || stream->blockedPackets) {
449             releasePacketFromStream(stream, NULL);
450         }
451
452         XLinkStreamReset(stream);
453     }
454
455     if(sem_destroy(&link->dispatcherClosedSem)) {
456         mvLog(MVLOG_DEBUG, "Cannot destroy dispatcherClosedSem\n");
457     }
458 }
459
460 void dispatcherCloseDeviceFd(xLinkDeviceHandle_t* deviceHandle)
461 {
462     XLinkPlatformCloseRemote(deviceHandle);
463 }
464
465 // ------------------------------------
466 // XLinkDispatcherImpl.h implementation. End.
467 // ------------------------------------
468
469
470
471 // ------------------------------------
472 // Helpers implementation. Begin.
473 // ------------------------------------
474
475 int isStreamSpaceEnoughFor(streamDesc_t* stream, uint32_t size)
476 {
477     if(stream->remoteFillPacketLevel >= XLINK_MAX_PACKETS_PER_STREAM ||
478        stream->remoteFillLevel + size > stream->writeSize){
479         mvLog(MVLOG_DEBUG, "S%d: Not enough space in stream '%s' for %ld: PKT %ld, FILL %ld SIZE %ld\n",
480               stream->id, stream->name, size, stream->remoteFillPacketLevel, stream->remoteFillLevel, stream->writeSize);
481         return 0;
482     }
483
484     return 1;
485 }
486
487 streamPacketDesc_t* getPacketFromStream(streamDesc_t* stream)
488 {
489     streamPacketDesc_t* ret = NULL;
490     if (stream->availablePackets)
491     {
492         ret = &stream->packets[stream->firstPacketUnused];
493         stream->availablePackets--;
494         CIRCULAR_INCREMENT(stream->firstPacketUnused,
495                            XLINK_MAX_PACKETS_PER_STREAM);
496         stream->blockedPackets++;
497     }
498     return ret;
499 }
500
501 int releasePacketFromStream(streamDesc_t* stream, uint32_t* releasedSize)
502 {
503     streamPacketDesc_t* currPack = &stream->packets[stream->firstPacket];
504     if(stream->blockedPackets == 0){
505         mvLog(MVLOG_ERROR,"There is no packet to release\n");
506         return 0; // ignore this, although this is a big problem on application side
507     }
508
509     stream->localFillLevel -= currPack->length;
510     mvLog(MVLOG_DEBUG, "S%d: Got release of %ld , current local fill level is %ld out of %ld %ld\n",
511           stream->id, currPack->length, stream->localFillLevel, stream->readSize, stream->writeSize);
512
513     XLinkPlatformDeallocateData(currPack->data,
514                                 ALIGN_UP_INT32((int32_t) currPack->length, __CACHE_LINE_SIZE), __CACHE_LINE_SIZE);
515
516     CIRCULAR_INCREMENT(stream->firstPacket, XLINK_MAX_PACKETS_PER_STREAM);
517     stream->blockedPackets--;
518     if (releasedSize) {
519         *releasedSize = currPack->length;
520     }
521     return 0;
522 }
523
524 int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size) {
525     if (stream->availablePackets + stream->blockedPackets < XLINK_MAX_PACKETS_PER_STREAM)
526     {
527         stream->packets[stream->firstPacketFree].data = buffer;
528         stream->packets[stream->firstPacketFree].length = size;
529         CIRCULAR_INCREMENT(stream->firstPacketFree, XLINK_MAX_PACKETS_PER_STREAM);
530         stream->availablePackets++;
531         return 0;
532     }
533     return -1;
534 }
535
536 int handleIncomingEvent(xLinkEvent_t* event) {
537     //this function will be dependent whether this is a client or a Remote
538     //specific actions to this peer
539     mvLog(MVLOG_DEBUG, "%s, size %u, streamId %u.\n", TypeToStr(event->header.type), event->header.size, event->header.streamId);
540
541     ASSERT_XLINK(event->header.type >= XLINK_WRITE_REQ
542                && event->header.type != XLINK_REQUEST_LAST
543                && event->header.type < XLINK_RESP_LAST);
544
545     // Then read the data buffer, which is contained only in the XLINK_WRITE_REQ event
546     if(event->header.type != XLINK_WRITE_REQ) {
547         return 0;
548     }
549
550     int rc = -1;
551     streamDesc_t* stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
552     ASSERT_XLINK(stream);
553
554     stream->localFillLevel += event->header.size;
555     mvLog(MVLOG_DEBUG,"S%d: Got write of %ld, current local fill level is %ld out of %ld %ld\n",
556           event->header.streamId, event->header.size, stream->localFillLevel, stream->readSize, stream->writeSize);
557
558     void* buffer = XLinkPlatformAllocateData(ALIGN_UP(event->header.size, __CACHE_LINE_SIZE), __CACHE_LINE_SIZE);
559     XLINK_OUT_WITH_LOG_IF(buffer == NULL,
560         mvLog(MVLOG_FATAL,"out of memory to receive data of size = %zu\n", event->header.size));
561
562     const int sc = XLinkPlatformRead(&event->deviceHandle, buffer, event->header.size);
563     XLINK_OUT_WITH_LOG_IF(sc < 0, mvLog(MVLOG_ERROR,"%s() Read failed %d\n", __func__, sc));
564
565     event->data = buffer;
566     XLINK_OUT_WITH_LOG_IF(addNewPacketToStream(stream, buffer, event->header.size),
567         mvLog(MVLOG_WARN,"No more place in stream. release packet\n"));
568     rc = 0;
569
570 XLINK_OUT:
571     releaseStream(stream);
572
573     if(rc != 0) {
574         if(buffer != NULL) {
575             XLinkPlatformDeallocateData(buffer,
576                 ALIGN_UP(event->header.size, __CACHE_LINE_SIZE), __CACHE_LINE_SIZE);
577         }
578         XLINK_EVENT_NOT_ACKNOWLEDGE(event);
579     }
580
581     return rc;
582 }
583
584 // ------------------------------------
585 // Helpers implementation. Begin.
586 // ------------------------------------