1 //******************************************************************
3 // Copyright 2015 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "telegesis_socket.h"
27 #include <sys/select.h>
30 #include "twsocketlist.h"
31 #include "oic_string.h"
32 #include "oic_malloc.h"
35 #define TAG "Telegesis_Socket"
38 * New thread's main() ftn.
40 void * readForever(/*PIPlugin */void * plugin);
42 * Just grabs the next char in the buffer. Called by readBufferLine() multiple times.
44 char readBufferChar(int fd, ssize_t * readDataBytes);
46 * Calls readBufferChar() until line is formed.
48 const char * readBufferLine(int fd);
50 * Calls readBufferLine() until a full TWEntry is formed.
52 TWEntry * readEntry(int fd);
54 * Posts the TWEntry to the queue.
56 TWResultCode TWEnqueueEntry(PIPlugin * plugin, TWEntry * entry);
59 * Defines the mapping relationships between Telegesis AT response/prompt tags and
60 * how many lines we should expect with each following the tag.
64 const char *resultTxt;
66 TWEntryType entryType;
69 static TWEntryTypePair TWEntryTypePairArray[] =
71 {"+N=", 1, TW_NETWORK_INFO},
72 {"JPAN:", 1, TW_JPAN},
77 {"MatchDesc:", 1, TW_MATCHDESC},
78 {"SimpleDesc:", 6, TW_SIMPLEDESC},
79 {"InCluster:", 1, TW_INCLUSTER},
80 {"WRITEATTR:", 1, TW_WRITEATTR},
81 {"RESPATTR:", 1, TW_RESPATTR},
82 {"TEMPERATURE:", 1, TW_RESPATTR},
83 {"DFTREP", 1, TW_DFTREP},
84 {"DRLOCRSP:", 1, TW_DRLOCRSP},
85 {"DRUNLOCKRSP:", 1, TW_DRUNLOCKRSP},
87 {"NACK:", 1, TW_NACK},
89 {"ZENROLLREQ:", 1, TW_ZENROLLREQ},
90 {"ENROLLED:", 1, TW_ENROLLED},
91 {"ZONESTATUS:", 1, TW_ZONESTATUS},
92 {"AddrResp:", 1, TW_ADDRESS_RESPONSE},
93 {"Unknown:", 0, TW_NONE},
94 {"Unknown:", 1, TW_MAX_ENTRY}
97 TWEntry * TWEntryList = NULL;
99 TWEntryTypePair getEntryTypePair(const char * bufferLine)
101 size_t bufferLength = strlen(bufferLine);
102 for(uint8_t i = 0; i < TW_MAX_ENTRY; i++)
104 size_t resultTxtLength = strlen(TWEntryTypePairArray[i].resultTxt);
105 if((bufferLength >= resultTxtLength) &&
106 strncmp(bufferLine, TWEntryTypePairArray[i].resultTxt, resultTxtLength) == 0)
108 return TWEntryTypePairArray[i];
111 return TWEntryTypePairArray[TW_MAX_ENTRY];
114 TWResultCode TWWait(pthread_cond_t * cond, pthread_mutex_t * mutex, uint8_t timeout)
117 // This is a blocking call which hold the calling thread until an entry
118 // has been enqueued or until the specified timeout has surpassed.
119 struct timespec abs_time;
120 clock_gettime(CLOCK_REALTIME , &abs_time);
121 abs_time.tv_sec += timeout;
122 ret = pthread_cond_timedwait(cond, mutex, &abs_time);
129 OC_LOG(INFO, TAG, "Timed out waiting for CV. Non-error. Please try again.");
132 OC_LOG(ERROR, TAG, "Cond or Mutex is invalid. OR timeout value for CV is invalid.");
135 OC_LOG(ERROR, TAG, "Cannot use CV because the current thread does not own the CV.");
139 return TW_RESULT_ERROR;
142 TWResultCode TWGrabMutex(pthread_mutex_t * mutex)
144 int ret = pthread_mutex_lock(mutex);
151 OC_LOG(ERROR, TAG, "Mutex was not initialized.");
154 OC_LOG(INFO, TAG, "Timed out waiting for mutex. Non-error. Please try again.");
157 OC_LOG(ERROR, TAG, "Maximum number of locks for mutex have been exceeded.");
160 OC_LOG(ERROR, TAG, "Deadlock OR this thread already owns the mutex.");
163 return TW_RESULT_ERROR;
166 TWResultCode TWReleaseMutex(pthread_mutex_t * mutex)
168 int ret = pthread_mutex_unlock(mutex);
175 OC_LOG(ERROR, TAG, "Mutex was not initialized.");
178 OC_LOG(ERROR, TAG, "Maximum number of locks for mutex have been exceeded.");
181 OC_LOG(ERROR, TAG, "Cannot unlock because the current thread does not own the mutex.");
184 return TW_RESULT_ERROR;
187 char readBufferChar(int fd, ssize_t * readDataBytes)
189 // Performs read operation on fd for one character at a time.
197 *readDataBytes = read(fd, &byte, sizeof(byte));
198 if(*readDataBytes < 0)
200 OC_LOG_V(ERROR, TAG, "\tCould not read from port. Errno is: %d\n", errno);
206 bool isLineIgnored(const char * line, size_t length)
210 if(line[0] == 'N' && line[1] == 'A' && line[2] == 'C' && line[3] == 'K')
217 if(line[0] == 'S' && line[1] == 'E' && line[2] == 'Q')
221 else if(line[0] == 'A' && line[1] == 'C' && line[2] == 'K')
228 if(line[0] == 'A' && line[1] == 'T')
230 // If the line starts with "AT", then this is an echo. We ignore echos.
233 else if(line[0] == 'O' && line[1] == 'K')
235 //If this line is "OK", we ignore success codes. But we do end TWEntry's on "OK".
236 // (FYI, error codes are handled in readEntry() which invokes this function.)
243 const char * readBufferLine(int fd)
245 char bufferChar = '\0';
246 size_t bufferLoc = 0;
247 ssize_t readDataBytes = 0;
248 char * bufferLineHold = NULL;
249 char * bufferLine = NULL;
250 bool endOfLine1 = false;
251 bool endOfLine2 = false;
255 if(!endOfLine1 || !endOfLine2)
257 bufferChar = readBufferChar(fd, &readDataBytes);
258 if(bufferChar == '\r')
263 if(bufferChar == '\n')
269 if(readDataBytes != 0 && (!endOfLine1 && !endOfLine2))
271 size_t bufferLineSize = sizeof(bufferChar)*(bufferLoc+2);
272 bufferLine = (char *) OICRealloc(bufferLineHold, bufferLineSize);
275 OC_LOG(ERROR, TAG, "Ran out of memory.");
278 bufferLine[bufferLoc] = '\0';
279 OICStrcat(bufferLine, bufferLineSize+2, &bufferChar);
281 bufferLineHold = bufferLine;
283 OC_LOG_V(DEBUG, TAG, "Incoming: %s", bufferLine);
291 size_t bufferLineSize = sizeof(bufferChar)*(bufferLoc+2);
292 bufferLine = (char *) OICRealloc(bufferLineHold, bufferLineSize);
295 OC_LOG(ERROR, TAG, "Ran out of memory.");
298 bufferLine[bufferLoc] = '\0';
300 bufferLineHold = bufferLine;
301 ignore = isLineIgnored(bufferLine, strlen(bufferLine));
305 return readBufferLine(fd);
307 if(endOfLine1 && endOfLine2)
319 TWResultCode TWAddLineToEntry(const char * line, TWEntry * entry)
323 OC_LOG(ERROR, TAG, "Invalid/NULL parameter(s) received.");
324 return TW_RESULT_ERROR_INVALID_PARAMS;
326 TWLine * twLine = (TWLine *) OICCalloc(1, sizeof(TWLine));
329 OC_LOG(ERROR, TAG, "Ran out of memory.");
330 return TW_RESULT_ERROR_NO_MEMORY;
332 size_t lineLength = strlen(line);
334 twLine->length = lineLength;
335 size_t errorLength = strlen(TW_ENDCONTROL_ERROR_STRING);
336 size_t maxLength = (lineLength > errorLength) ? errorLength : lineLength;
338 if((errorLength == lineLength) &&
339 strncmp(line, TW_ENDCONTROL_ERROR_STRING, maxLength) == 0)
341 entry->atErrorCode[0] = line[errorLength];
342 entry->atErrorCode[1] = line[errorLength + 1];
346 entry->atErrorCode[0] = '0';
347 entry->atErrorCode[1] = '0';
350 // Null terminate the string.
351 entry->atErrorCode[2] = '\0';
355 entry->lines = twLine;
359 entry->lines[entry->count] = *twLine;
366 TWEntry * readEntry(int fd)
368 // Calls readBufferLine().
369 // Forms TWEntry from 1-n lines based on the response type.
371 TWEntry * entry = (TWEntry *) OICCalloc(1, sizeof(TWEntry));
374 OC_LOG(ERROR, TAG, "Ran out of memory.");
379 const char * bufferLine = NULL;
380 TWEntryTypePair entryTypePair = { .resultTxt = NULL,
382 .entryType = TW_NONE };
384 while(numLines == 0 || bufferLine)
388 bufferLine = readBufferLine(fd);
393 entryTypePair = getEntryTypePair(bufferLine);
397 bufferLine = readBufferLine(fd);
399 if(bufferLine != NULL)
401 entry->type = entryTypePair.entryType;
402 TWAddLineToEntry(bufferLine, entry);
404 entry->count = numLines;
407 if(entryTypePair.numLines != numLines)
409 entry->resultCode = TW_RESULT_ERROR_LINE_COUNT;
413 entry->resultCode = TW_RESULT_OK;
422 TWResultCode TWRetrieveEUI(PIPlugin * plugin, TWSock * twSock)
424 if(twSock->isActive == false)
426 return TW_RESULT_ERROR;
429 TWEntry * entry = NULL;
430 TWResultCode deleteResult = TW_RESULT_OK;
431 TWResultCode result = TWIssueATCommand(plugin, AT_CMD_GET_LOCAL_EUI);
432 if(result != TW_RESULT_OK)
436 result = TWGrabMutex(&twSock->mutex);
437 if(result != TW_RESULT_OK)
441 entry = readEntry(twSock->fd);
444 result = TWReleaseMutex(&twSock->mutex);
445 if(result != TW_RESULT_OK)
450 twSock->eui = (char *) OICMalloc(strlen(entry->lines[0].line)+1);
453 result = TW_RESULT_ERROR_NO_MEMORY;
457 if(SIZE_EUI != (strlen(entry->lines[0].line)+1))
459 OICFree(twSock->eui);
460 result = TW_RESULT_ERROR;
464 OICStrcpy(twSock->eui, SIZE_EUI, entry->lines[0].line);
466 result = TWReleaseMutex(&twSock->mutex);
467 if(result != TW_RESULT_OK)
469 OICFree(twSock->eui);
473 deleteResult = TWDeleteEntry(plugin, entry);
474 if(deleteResult != TW_RESULT_OK)
481 TWResultCode TWStartSock(PIPlugin * plugin, const char * fileLoc)
483 TWSock * sock = TWGetSock((PIPlugin *)plugin);
484 if(sock && sock->isActive == true)
486 // Ignore requests to start up the same socket.
491 sock = (TWSock *) OICCalloc(1, sizeof(TWSock));
494 return TW_RESULT_ERROR_NO_MEMORY;
497 TWResultCode ret = TWAddTWSock(sock, plugin, fileLoc);
503 ret = TWRetrieveEUI((PIPlugin *)plugin, sock);
504 if(ret != TW_RESULT_OK)
506 OC_LOG(ERROR, TAG, "Unable to retrieve Zigbee Radio's EUI.");
510 int result = pthread_create(&(sock->threadHandle),
516 OC_LOG_V(ERROR, TAG, "Thread start failed with error %d", result);
517 ret = TW_RESULT_ERROR;
524 TWDeleteTWSock(sock);
528 static void sigHandler(int signal)
530 pthread_t tid = pthread_self();
533 OC_LOG_V(INFO, TAG, "Received signal on thread: %lu\n", tid);
534 OC_LOG_V(INFO, TAG, "Signal is: %d", signal);
537 void * readForever(/*PIPlugin*/ void * plugin)
539 TWResultCode result = TW_RESULT_OK;
540 TWEntry * entry = NULL;
541 TWSock * twSock = TWGetSock((PIPlugin *)plugin);
544 OC_LOG(ERROR, TAG, "Unable to retrieve associated socket.");
548 pthread_t tid = pthread_self();
550 OC_LOG_V(INFO, TAG, "ReadForever Telegesis ThreadId: %lu", tid);
552 struct sigaction action = { .sa_handler = 0 };
556 action.sa_handler = sigHandler;
558 sigemptyset(&action.sa_mask);
559 sigaction(EINTR, &action, NULL);
563 FD_SET(twSock->fd, &readFDS);
564 bool haveMutex = false;
568 // 'sigmask' is needed to catch intterupts.
569 // This interrupt happens after call to pthread_exit(..., EINTR).
570 // Once a signal handler is registered, pselect will handle interrupts by returning
571 // with '-1' and setting errno appropriately.
572 int ret = pselect(twSock->fd+1, &readFDS, NULL, NULL, NULL, &sigmask);
580 // This EINTR signal is not for us. Do not handle it.
582 OC_LOG(DEBUG, TAG, "Thread has been joined. Exiting thread.");
583 pthread_exit(PTHREAD_CANCELED);
588 OC_LOG_V(ERROR, TAG, "Unaccounted error occurred. Exiting thread."
589 "Errno is: %d", errno);
595 ret = FD_ISSET(twSock->fd, &readFDS);
598 // Valid data on valid socket.
599 // Grab & parse, then pass up to upper layers.
600 if(haveMutex != true)
602 result = TWGrabMutex(&twSock->mutex);
603 if(result != TW_RESULT_OK)
605 OC_LOG(ERROR, TAG, "Unable to grab mutex.");
610 entry = readEntry(twSock->fd);
613 result = TWReleaseMutex(&twSock->mutex);
614 if(result != TW_RESULT_OK)
616 OC_LOG(ERROR, TAG, "Unable to release mutex.");
620 // This is most likely a parsing error of the received
621 // response. Not necessarily fatal.
624 result = TWEnqueueEntry((PIPlugin *)plugin, entry);
625 if(result != TW_RESULT_OK)
627 OC_LOG_V(ERROR, TAG, "Could not add TWEntry to queue for"
628 "consumption by the application"
629 "layer. Error is: %d", result);
630 TWDeleteEntry((PIPlugin *)plugin, entry);
631 // This is most likely a FATAL error, such as out of memory.
635 // Notify other threads waiting for a response that an entry has been enqueued.
636 pthread_cond_signal(&twSock->queueCV);
638 result = TWReleaseMutex(&twSock->mutex);
643 // Unrelated data waiting elsewhere. Continue the loop.
652 TWResultCode TWIssueATCommand(PIPlugin * plugin, const char * command)
654 if(!plugin || !command)
656 return TW_RESULT_ERROR_INVALID_PARAMS;
658 OC_LOG_V(INFO, TAG, "Attempt to write %s.", command);
659 TWSock * twSock = TWGetSock(plugin);
662 return TW_RESULT_ERROR;
665 if(twSock->isActive == false)
667 return TW_RESULT_ERROR;
670 TWResultCode result = TW_RESULT_OK;
671 TWResultCode mutexResult = TW_RESULT_OK;
672 mutexResult = TWGrabMutex(&twSock->mutex);
673 if(mutexResult != TW_RESULT_OK)
677 size_t sendCommandSize = (strlen(command) + strlen("\r") + 1) * sizeof(char);
678 char * sendCommand = (char *) OICCalloc(1, sendCommandSize);
681 return TW_RESULT_ERROR_NO_MEMORY;
683 char * temp = OICStrcpy(sendCommand, sendCommandSize, command);
684 if(temp != sendCommand)
686 result = TW_RESULT_ERROR;
689 temp = OICStrcat(sendCommand, sendCommandSize, "\r");
690 if(temp != sendCommand)
692 result = TW_RESULT_ERROR;
695 size_t expectedWrittenBytes = strlen(sendCommand);
697 size_t actualWrittenBytes = write(twSock->fd, sendCommand, expectedWrittenBytes);
698 if(actualWrittenBytes <= 0)
700 OC_LOG_V(ERROR, TAG, "Could not write to port. Errno is: %d\n", errno);
701 result = TW_RESULT_ERROR;
704 if(actualWrittenBytes != expectedWrittenBytes)
706 OC_LOG(ERROR, TAG, "Telegesis Adapter did not receive expected command. Unknown state.");
707 result = TW_RESULT_ERROR;
711 mutexResult = TWReleaseMutex(&twSock->mutex);
712 if(mutexResult != TW_RESULT_OK)
716 OICFree(sendCommand);
720 TWResultCode TWEnqueueEntry(PIPlugin * plugin, TWEntry * entry)
722 if(!plugin || !entry)
724 return TW_RESULT_ERROR_INVALID_PARAMS;
726 TWSock * twSock = TWGetSock(plugin);
729 return TW_RESULT_ERROR;
732 if(twSock->isActive == false)
734 return TW_RESULT_ERROR;
736 LL_APPEND(twSock->queue, entry);
740 TWResultCode TWDequeueEntry(PIPlugin * plugin, TWEntry ** entry, TWEntryType type)
742 if(!plugin || !entry)
744 return TW_RESULT_ERROR_INVALID_PARAMS;
746 TWSock * twSock = TWGetSock(plugin);
749 return TW_RESULT_ERROR;
752 if(twSock->isActive == false)
754 return TW_RESULT_ERROR;
757 TWResultCode ret = TW_RESULT_OK;
759 // If no entry is found, then this code path returns immediately.
760 ret = TWGrabMutex(&twSock->mutex);
761 if(ret != TW_RESULT_OK)
768 // Wait for up to 10 seconds for the entry to put into the queue.
769 ret = TWWait(&twSock->queueCV, &twSock->mutex, TIME_OUT_10_SECONDS);
770 if(ret != TW_RESULT_OK)
776 *entry = twSock->queue;
779 LL_DELETE(twSock->queue, *entry);
781 ret = TWReleaseMutex(&twSock->mutex);
782 if(ret != TW_RESULT_OK)
789 TWResultCode TWFreeQueue(PIPlugin * plugin)
793 return TW_RESULT_ERROR_INVALID_PARAMS;
795 TWResultCode ret = TW_RESULT_OK;
796 TWEntry * entry = NULL;
799 ret = TWDequeueEntry(plugin, &entry, TW_NONE);
800 if(ret != TW_RESULT_OK)
808 ret = TWDeleteEntry(plugin, entry);
809 if(ret != TW_RESULT_OK)
817 TWResultCode TWDeleteEntry(PIPlugin * plugin, TWEntry * entry)
819 if(!plugin || !entry)
821 return TW_RESULT_ERROR_INVALID_PARAMS;
824 TWSock * twSock = TWGetSock(plugin);
827 return TW_RESULT_ERROR;
830 if(twSock->isActive == false)
832 return TW_RESULT_ERROR;
835 TWResultCode ret = TWGrabMutex(&twSock->mutex);
836 if(ret != TW_RESULT_OK)
840 TWEntry * out = NULL;
841 TWEntry * tmp = NULL;
842 LL_FOREACH_SAFE(twSock->queue, out, tmp)
846 OC_LOG(ERROR, TAG, "Tried to delete an entry that is still in the queue. \
847 Please dequeue the entry first.");
848 return TW_RESULT_ERROR;
851 ret = TWReleaseMutex(&twSock->mutex);
858 TWResultCode TWGetEUI(PIPlugin * plugin, char ** eui)
862 return TW_RESULT_ERROR_INVALID_PARAMS;
864 TWSock * twSock = TWGetSock(plugin);
867 return TW_RESULT_ERROR;
870 if(twSock->isActive == false)
872 return TW_RESULT_ERROR;
875 *eui = OICStrdup(twSock->eui);
881 TWResultCode TWStopSock(PIPlugin * plugin)
885 return TW_RESULT_ERROR_INVALID_PARAMS;
887 TWSock * twSock = TWGetSock(plugin);
890 return TW_RESULT_ERROR;
893 if(twSock->isActive == false)
895 return TW_RESULT_ERROR;
898 TWResultCode ret = TWFreeQueue(plugin);
899 if(ret != TW_RESULT_OK)
904 twSock->isActive = false;
906 void * retVal = NULL;
907 int pthreadRet = pthread_kill(twSock->threadHandle, EINTR);
910 return TW_RESULT_ERROR;
913 pthreadRet = pthread_join(twSock->threadHandle, &retVal);
919 OC_LOG(ERROR, TAG, "A deadlock has occurred.");
922 OC_LOG(ERROR, TAG, "Thread is not joinable or another thread has already joined.");
925 OC_LOG(ERROR, TAG, "No thread with the ID could be found.");
928 OC_LOG_V(ERROR, TAG, "Unknown error occurred when joining thread: %d", pthreadRet);
930 return TW_RESULT_ERROR;
932 if(retVal != PTHREAD_CANCELED)
934 return TW_RESULT_ERROR;
936 ret = TWDeleteTWSock(twSock);
937 if(ret != TW_RESULT_OK)