1 // Copyright (C) 2018-2020 Intel Corporation
2 // SPDX-License-Identifier: Apache-2.0
8 #include "XLinkMacros.h"
9 #include "XLinkErrorUtils.h"
10 #include "XLinkPlatform.h"
11 #include "XLinkDispatcherImpl.h"
12 #include "XLinkPrivateFields.h"
14 #ifdef MVLOG_UNIT_NAME
15 #undef MVLOG_UNIT_NAME
16 #define MVLOG_UNIT_NAME xLink
19 #include "XLinkStringUtils.h"
21 // ------------------------------------
22 // Helpers declaration. Begin.
23 // ------------------------------------
25 static int isStreamSpaceEnoughFor(streamDesc_t* stream, uint32_t size);
27 static streamPacketDesc_t* getPacketFromStream(streamDesc_t* stream);
28 static int releasePacketFromStream(streamDesc_t* stream, uint32_t* releasedSize);
29 static int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size);
31 static int handleIncomingEvent(xLinkEvent_t* event);
33 // ------------------------------------
34 // Helpers declaration. End.
35 // ------------------------------------
39 // ------------------------------------
40 // XLinkDispatcherImpl.h implementation. Begin.
41 // ------------------------------------
43 //adds a new event with parameters and returns event id
44 int dispatcherEventSend(xLinkEvent_t *event)
46 mvLog(MVLOG_DEBUG, "Send event: %s, size %d, streamId %ld.\n",
47 TypeToStr(event->header.type), event->header.size, event->header.streamId);
49 int rc = XLinkPlatformWrite(&event->deviceHandle,
50 &event->header, sizeof(event->header));
53 mvLog(MVLOG_ERROR,"Write failed (header) (err %d) | event %s\n", rc, TypeToStr(event->header.type));
57 if (event->header.type == XLINK_WRITE_REQ) {
58 rc = XLinkPlatformWrite(&event->deviceHandle,
59 event->data, event->header.size);
61 mvLog(MVLOG_ERROR,"Write failed %d\n", rc);
69 int dispatcherEventReceive(xLinkEvent_t* event){
70 static xLinkEvent_t prevEvent = {0};
71 int rc = XLinkPlatformRead(&event->deviceHandle,
72 &event->header, sizeof(event->header));
74 mvLog(MVLOG_DEBUG,"Incoming event %p: %s %d %p prevEvent: %s %d %p\n",
76 TypeToStr(event->header.type),
77 (int)event->header.id,
78 event->deviceHandle.xLinkFD,
79 TypeToStr(prevEvent.header.type),
80 (int)prevEvent.header.id,
81 prevEvent.deviceHandle.xLinkFD);
84 mvLog(MVLOG_DEBUG,"%s() Read failed %d\n", __func__, (int)rc);
88 if (prevEvent.header.id == event->header.id &&
89 prevEvent.header.type == event->header.type &&
90 prevEvent.deviceHandle.xLinkFD == event->deviceHandle.xLinkFD) {
91 mvLog(MVLOG_FATAL,"Duplicate id detected. \n");
95 return handleIncomingEvent(event);
98 //this function should be called only for remote requests
99 int dispatcherLocalEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
101 streamDesc_t* stream;
102 response->header.id = event->header.id;
103 mvLog(MVLOG_DEBUG, "%s\n",TypeToStr(event->header.type));
104 switch (event->header.type){
105 case XLINK_WRITE_REQ:
108 //in case local tries to write after it issues close (writeSize is zero)
109 stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
112 mvLog(MVLOG_DEBUG, "stream %d has been closed!\n", event->header.streamId);
113 XLINK_SET_EVENT_FAILED_AND_SERVE(event);
117 if (stream->writeSize == 0)
119 XLINK_EVENT_NOT_ACKNOWLEDGE(event);
120 // return -1 to don't even send it to the remote
121 releaseStream(stream);
124 XLINK_EVENT_ACKNOWLEDGE(event);
125 event->header.flags.bitField.localServe = 0;
127 if(!isStreamSpaceEnoughFor(stream, event->header.size)){
128 mvLog(MVLOG_DEBUG,"local NACK RTS. stream '%s' is full (event %d)\n", stream->name, event->header.id);
129 event->header.flags.bitField.block = 1;
130 event->header.flags.bitField.localServe = 1;
131 // TODO: easy to implement non-blocking read here, just return nack
132 mvLog(MVLOG_WARN, "Blocked event would cause dispatching thread to wait on semaphore infinitely\n");
134 event->header.flags.bitField.block = 0;
135 stream->remoteFillLevel += event->header.size;
136 stream->remoteFillPacketLevel++;
137 mvLog(MVLOG_DEBUG,"S%d: Got local write of %ld , remote fill level %ld out of %ld %ld\n",
138 event->header.streamId, event->header.size, stream->remoteFillLevel, stream->writeSize, stream->readSize);
140 releaseStream(stream);
145 stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
147 mvLog(MVLOG_DEBUG, "stream %d has been closed!\n", event->header.streamId);
148 XLINK_SET_EVENT_FAILED_AND_SERVE(event);
151 streamPacketDesc_t* packet = getPacketFromStream(stream);
153 //the read can be served with this packet
154 event->data = packet;
155 XLINK_EVENT_ACKNOWLEDGE(event);
156 event->header.flags.bitField.block = 0;
159 event->header.flags.bitField.block = 1;
160 // TODO: easy to implement non-blocking read here, just return nack
162 event->header.flags.bitField.localServe = 1;
163 releaseStream(stream);
166 case XLINK_READ_REL_REQ:
168 stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
169 ASSERT_XLINK(stream);
170 XLINK_EVENT_ACKNOWLEDGE(event);
171 uint32_t releasedSize = 0;
172 releasePacketFromStream(stream, &releasedSize);
173 event->header.size = releasedSize;
174 releaseStream(stream);
177 case XLINK_CREATE_STREAM_REQ:
179 XLINK_EVENT_ACKNOWLEDGE(event);
180 mvLog(MVLOG_DEBUG,"XLINK_CREATE_STREAM_REQ - do nothing\n");
183 case XLINK_CLOSE_STREAM_REQ:
185 stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
187 ASSERT_XLINK(stream);
188 XLINK_EVENT_ACKNOWLEDGE(event);
189 if (stream->remoteFillLevel != 0){
190 stream->closeStreamInitiated = 1;
191 event->header.flags.bitField.block = 1;
192 event->header.flags.bitField.localServe = 1;
194 event->header.flags.bitField.block = 0;
195 event->header.flags.bitField.localServe = 0;
197 releaseStream(stream);
200 case XLINK_RESET_REQ:
202 XLINK_EVENT_ACKNOWLEDGE(event);
203 mvLog(MVLOG_DEBUG,"XLINK_RESET_REQ - do nothing\n");
208 XLINK_EVENT_ACKNOWLEDGE(event);
209 mvLog(MVLOG_DEBUG,"XLINK_PING_REQ - do nothing\n");
212 case XLINK_WRITE_RESP:
213 case XLINK_READ_RESP:
214 case XLINK_READ_REL_RESP:
215 case XLINK_CREATE_STREAM_RESP:
216 case XLINK_CLOSE_STREAM_RESP:
217 case XLINK_PING_RESP:
219 case XLINK_RESET_RESP:
221 event->header.flags.bitField.localServe = 1;
226 "Fail to get response for local event. type: %d, stream name: %s\n",
227 event->header.type, event->header.streamName);
234 //this function should be called only for remote requests
235 int dispatcherRemoteEventGetResponse(xLinkEvent_t* event, xLinkEvent_t* response)
237 streamDesc_t* stream;
238 response->header.id = event->header.id;
239 response->header.flags.raw = 0;
240 mvLog(MVLOG_DEBUG, "%s\n",TypeToStr(event->header.type));
242 switch (event->header.type)
244 case XLINK_WRITE_REQ:
246 //let remote write immediately as we have a local buffer for the data
247 response->header.type = XLINK_WRITE_RESP;
248 response->header.size = event->header.size;
249 response->header.streamId = event->header.streamId;
250 response->deviceHandle = event->deviceHandle;
251 XLINK_EVENT_ACKNOWLEDGE(response);
254 // we got some data. We should unblock a blocked read
255 int xxx = DispatcherUnblockEvent(-1,
257 response->header.streamId,
258 event->deviceHandle.xLinkFD);
260 mvLog(MVLOG_DEBUG,"unblocked from stream %d %d\n",
261 (int)response->header.streamId, (int)xxx);
266 case XLINK_READ_REL_REQ:
267 XLINK_EVENT_ACKNOWLEDGE(response);
268 response->header.type = XLINK_READ_REL_RESP;
269 response->deviceHandle = event->deviceHandle;
270 stream = getStreamById(event->deviceHandle.xLinkFD,
271 event->header.streamId);
272 ASSERT_XLINK(stream);
273 stream->remoteFillLevel -= event->header.size;
274 stream->remoteFillPacketLevel--;
276 mvLog(MVLOG_DEBUG,"S%d: Got remote release of %ld, remote fill level %ld out of %ld %ld\n",
277 event->header.streamId, event->header.size, stream->remoteFillLevel, stream->writeSize, stream->readSize);
278 releaseStream(stream);
280 DispatcherUnblockEvent(-1, XLINK_WRITE_REQ, event->header.streamId,
281 event->deviceHandle.xLinkFD);
282 //with every released packet check if the stream is already marked for close
283 if (stream->closeStreamInitiated && stream->localFillLevel == 0)
285 mvLog(MVLOG_DEBUG,"%s() Unblock close STREAM\n", __func__);
286 int xxx = DispatcherUnblockEvent(-1,
287 XLINK_CLOSE_STREAM_REQ,
288 event->header.streamId,
289 event->deviceHandle.xLinkFD);
293 case XLINK_CREATE_STREAM_REQ:
294 XLINK_EVENT_ACKNOWLEDGE(response);
295 response->header.type = XLINK_CREATE_STREAM_RESP;
296 //write size from remote means read size for this peer
297 response->header.streamId = XLinkAddOrUpdateStream(event->deviceHandle.xLinkFD,
298 event->header.streamName,
299 0, event->header.size,
302 if (response->header.streamId == INVALID_STREAM_ID) {
303 response->header.flags.bitField.ack = 0;
304 response->header.flags.bitField.sizeTooBig = 1;
308 response->deviceHandle = event->deviceHandle;
309 mv_strncpy(response->header.streamName, MAX_STREAM_NAME_LENGTH,
310 event->header.streamName, MAX_STREAM_NAME_LENGTH - 1);
311 response->header.size = event->header.size;
312 mvLog(MVLOG_DEBUG,"creating stream %x\n", (int)response->header.streamId);
314 case XLINK_CLOSE_STREAM_REQ:
316 response->header.type = XLINK_CLOSE_STREAM_RESP;
317 response->header.streamId = event->header.streamId;
318 response->deviceHandle = event->deviceHandle;
320 streamDesc_t* stream = getStreamById(event->deviceHandle.xLinkFD,
321 event->header.streamId);
323 //if we have sent a NACK before, when the event gets unblocked
324 //the stream might already be unavailable
325 XLINK_EVENT_ACKNOWLEDGE(response);
326 mvLog(MVLOG_DEBUG,"%s() got a close stream on aready closed stream\n", __func__);
328 if (stream->localFillLevel == 0)
330 XLINK_EVENT_ACKNOWLEDGE(response);
332 if (stream->readSize)
334 stream->readSize = 0;
335 stream->closeStreamInitiated = 0;
338 if (!stream->writeSize) {
339 stream->id = INVALID_STREAM_ID;
340 stream->name[0] = '\0';
343 if(XLink_sem_destroy(&stream->sem))
344 perror("Can't destroy semaphore");
349 mvLog(MVLOG_DEBUG,"%s():fifo is NOT empty returning NACK \n", __func__);
350 XLINK_EVENT_NOT_ACKNOWLEDGE(response);
351 stream->closeStreamInitiated = 1;
354 releaseStream(stream);
359 response->header.type = XLINK_PING_RESP;
360 XLINK_EVENT_ACKNOWLEDGE(response);
361 response->deviceHandle = event->deviceHandle;
364 case XLINK_RESET_REQ:
365 mvLog(MVLOG_DEBUG,"reset request - received! Sending ACK *****\n");
366 XLINK_EVENT_ACKNOWLEDGE(response);
367 response->header.type = XLINK_RESET_RESP;
368 response->deviceHandle = event->deviceHandle;
369 // need to send the response, serve the event and then reset
371 case XLINK_WRITE_RESP:
373 case XLINK_READ_RESP:
375 case XLINK_READ_REL_RESP:
377 case XLINK_CREATE_STREAM_RESP:
379 // write_size from the response the size of the buffer from the remote
380 response->header.streamId = XLinkAddOrUpdateStream(event->deviceHandle.xLinkFD,
381 event->header.streamName,
382 event->header.size, 0,
383 event->header.streamId);
384 XLINK_RET_IF(response->header.streamId
385 == INVALID_STREAM_ID);
386 response->deviceHandle = event->deviceHandle;
389 case XLINK_CLOSE_STREAM_RESP:
391 streamDesc_t* stream = getStreamById(event->deviceHandle.xLinkFD,
392 event->header.streamId);
395 XLINK_EVENT_NOT_ACKNOWLEDGE(response);
398 stream->writeSize = 0;
399 if (!stream->readSize) {
400 XLINK_EVENT_NOT_ACKNOWLEDGE(response);
401 stream->id = INVALID_STREAM_ID;
402 stream->name[0] = '\0';
405 releaseStream(stream);
408 case XLINK_PING_RESP:
410 case XLINK_RESET_RESP:
415 "Fail to get response for remote event. type: %d, stream name: %s\n",
416 event->header.type, event->header.streamName);
423 void dispatcherCloseLink(void* fd, int fullClose)
425 xLinkDesc_t* link = getLink(fd);
428 mvLog(MVLOG_WARN, "Dispatcher link is null");
433 link->peerState = XLINK_DOWN;
437 link->id = INVALID_LINK_ID;
438 link->deviceHandle.xLinkFD = NULL;
439 link->peerState = XLINK_NOT_INIT;
440 link->nextUniqueStreamId = 0;
442 for (int index = 0; index < XLINK_MAX_STREAMS; index++) {
443 streamDesc_t* stream = &link->availableStreams[index];
448 while (getPacketFromStream(stream) || stream->blockedPackets) {
449 releasePacketFromStream(stream, NULL);
452 XLinkStreamReset(stream);
455 if(XLink_sem_destroy(&link->dispatcherClosedSem)) {
456 mvLog(MVLOG_DEBUG, "Cannot destroy dispatcherClosedSem\n");
460 void dispatcherCloseDeviceFd(xLinkDeviceHandle_t* deviceHandle)
462 XLinkPlatformCloseRemote(deviceHandle);
465 // ------------------------------------
466 // XLinkDispatcherImpl.h implementation. End.
467 // ------------------------------------
471 // ------------------------------------
472 // Helpers implementation. Begin.
473 // ------------------------------------
475 int isStreamSpaceEnoughFor(streamDesc_t* stream, uint32_t size)
477 if(stream->remoteFillPacketLevel >= XLINK_MAX_PACKETS_PER_STREAM ||
478 stream->remoteFillLevel + size > stream->writeSize){
479 mvLog(MVLOG_DEBUG, "S%d: Not enough space in stream '%s' for %ld: PKT %ld, FILL %ld SIZE %ld\n",
480 stream->id, stream->name, size, stream->remoteFillPacketLevel, stream->remoteFillLevel, stream->writeSize);
487 streamPacketDesc_t* getPacketFromStream(streamDesc_t* stream)
489 streamPacketDesc_t* ret = NULL;
490 if (stream->availablePackets)
492 ret = &stream->packets[stream->firstPacketUnused];
493 stream->availablePackets--;
494 CIRCULAR_INCREMENT(stream->firstPacketUnused,
495 XLINK_MAX_PACKETS_PER_STREAM);
496 stream->blockedPackets++;
501 int releasePacketFromStream(streamDesc_t* stream, uint32_t* releasedSize)
503 streamPacketDesc_t* currPack = &stream->packets[stream->firstPacket];
504 if(stream->blockedPackets == 0){
505 mvLog(MVLOG_ERROR,"There is no packet to release\n");
506 return 0; // ignore this, although this is a big problem on application side
509 stream->localFillLevel -= currPack->length;
510 mvLog(MVLOG_DEBUG, "S%d: Got release of %ld , current local fill level is %ld out of %ld %ld\n",
511 stream->id, currPack->length, stream->localFillLevel, stream->readSize, stream->writeSize);
513 XLinkPlatformDeallocateData(currPack->data,
514 ALIGN_UP_INT32((int32_t) currPack->length, __CACHE_LINE_SIZE), __CACHE_LINE_SIZE);
516 CIRCULAR_INCREMENT(stream->firstPacket, XLINK_MAX_PACKETS_PER_STREAM);
517 stream->blockedPackets--;
519 *releasedSize = currPack->length;
524 int addNewPacketToStream(streamDesc_t* stream, void* buffer, uint32_t size) {
525 if (stream->availablePackets + stream->blockedPackets < XLINK_MAX_PACKETS_PER_STREAM)
527 stream->packets[stream->firstPacketFree].data = buffer;
528 stream->packets[stream->firstPacketFree].length = size;
529 CIRCULAR_INCREMENT(stream->firstPacketFree, XLINK_MAX_PACKETS_PER_STREAM);
530 stream->availablePackets++;
536 int handleIncomingEvent(xLinkEvent_t* event) {
537 //this function will be dependent whether this is a client or a Remote
538 //specific actions to this peer
539 mvLog(MVLOG_DEBUG, "%s, size %u, streamId %u.\n", TypeToStr(event->header.type), event->header.size, event->header.streamId);
541 ASSERT_XLINK(event->header.type >= XLINK_WRITE_REQ
542 && event->header.type != XLINK_REQUEST_LAST
543 && event->header.type < XLINK_RESP_LAST);
545 // Then read the data buffer, which is contained only in the XLINK_WRITE_REQ event
546 if(event->header.type != XLINK_WRITE_REQ) {
551 streamDesc_t* stream = getStreamById(event->deviceHandle.xLinkFD, event->header.streamId);
552 ASSERT_XLINK(stream);
554 stream->localFillLevel += event->header.size;
555 mvLog(MVLOG_DEBUG,"S%d: Got write of %ld, current local fill level is %ld out of %ld %ld\n",
556 event->header.streamId, event->header.size, stream->localFillLevel, stream->readSize, stream->writeSize);
558 void* buffer = XLinkPlatformAllocateData(ALIGN_UP(event->header.size, __CACHE_LINE_SIZE), __CACHE_LINE_SIZE);
559 XLINK_OUT_WITH_LOG_IF(buffer == NULL,
560 mvLog(MVLOG_FATAL,"out of memory to receive data of size = %zu\n", event->header.size));
562 const int sc = XLinkPlatformRead(&event->deviceHandle, buffer, event->header.size);
563 XLINK_OUT_WITH_LOG_IF(sc < 0, mvLog(MVLOG_ERROR,"%s() Read failed %d\n", __func__, sc));
565 event->data = buffer;
566 XLINK_OUT_WITH_LOG_IF(addNewPacketToStream(stream, buffer, event->header.size),
567 mvLog(MVLOG_WARN,"No more place in stream. release packet\n"));
571 releaseStream(stream);
575 XLinkPlatformDeallocateData(buffer,
576 ALIGN_UP(event->header.size, __CACHE_LINE_SIZE), __CACHE_LINE_SIZE);
578 XLINK_EVENT_NOT_ACKNOWLEDGE(event);
584 // ------------------------------------
585 // Helpers implementation. Begin.
586 // ------------------------------------