1 //******************************************************************
3 // Copyright 2015 Intel Mobile Communications GmbH All Rights Reserved.
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
21 #include "telegesis_socket.h"
27 #include <sys/select.h>
30 #include "twsocketlist.h"
31 #include "oic_string.h"
32 #include "oic_malloc.h"
35 #define TAG "Telegesis_Socket"
38 * New thread's main() ftn.
40 void * readForever(/*PIPlugin */void * plugin);
42 * Just grabs the next char in the buffer. Called by readBufferLine() multiple times.
44 char readBufferChar(int fd, ssize_t * readDataBytes);
46 * Calls readBufferChar() until line is formed.
48 const char * readBufferLine(int fd);
50 * Calls readBufferLine() until a full TWEntry is formed.
52 TWEntry * readEntry(int fd);
54 * Posts the TWEntry to the queue.
56 TWResultCode TWEnqueueEntry(PIPlugin * plugin, TWEntry * entry);
59 * Defines the mapping relationships between Telegesis AT response/prompt tags and
60 * how many lines we should expect with each following the tag.
64 const char *resultTxt;
66 TWEntryType entryType;
69 static TWEntryTypePair TWEntryTypePairArray[] =
71 {"+N=", 1, TW_NETWORK_INFO},
72 {"JPAN:", 1, TW_JPAN},
77 {"MatchDesc:", 1, TW_MATCHDESC},
78 {"SimpleDesc:", 6, TW_SIMPLEDESC},
79 {"InCluster:", 1, TW_INCLUSTER},
80 {"WRITEATTR:", 1, TW_WRITEATTR},
81 {"RESPATTR:", 1, TW_RESPATTR},
82 {"TEMPERATURE:", 1, TW_RESPATTR},
83 {"DFTREP", 1, TW_DFTREP},
84 {"DRLOCRSP:", 1, TW_DRLOCRSP},
85 {"DRUNLOCKRSP:", 1, TW_DRUNLOCKRSP},
87 {"NACK:", 1, TW_NACK},
89 {"ZENROLLREQ:", 1, TW_ZENROLLREQ},
90 {"ENROLLED:", 1, TW_ENROLLED},
91 {"Unknown:", 0, TW_NONE},
92 {"Unknown:", 1, TW_MAX_ENTRY}
95 TWEntry * TWEntryList = NULL;
97 TWEntryTypePair getEntryTypePair(const char * bufferLine)
99 size_t bufferLength = strlen(bufferLine);
100 for(uint8_t i = 0; i < TW_MAX_ENTRY; i++)
102 size_t resultTxtLength = strlen(TWEntryTypePairArray[i].resultTxt);
103 if((bufferLength >= resultTxtLength) &&
104 strncmp(bufferLine, TWEntryTypePairArray[i].resultTxt, resultTxtLength) == 0)
106 return TWEntryTypePairArray[i];
109 return TWEntryTypePairArray[TW_MAX_ENTRY];
112 TWResultCode TWWait(pthread_cond_t * cond, pthread_mutex_t * mutex, uint8_t timeout)
115 // This is a blocking call which hold the calling thread until an entry
116 // has been enqueued or until the specified timeout has surpassed.
117 struct timespec abs_time;
118 clock_gettime(CLOCK_REALTIME , &abs_time);
119 abs_time.tv_sec += timeout;
120 ret = pthread_cond_timedwait(cond, mutex, &abs_time);
127 OC_LOG(INFO, TAG, "Timed out waiting for CV. Non-error. Please try again.");
130 OC_LOG(ERROR, TAG, "Cond or Mutex is invalid. OR timeout value for CV is invalid.");
133 OC_LOG(ERROR, TAG, "Cannot use CV because the current thread does not own the CV.");
137 return TW_RESULT_ERROR;
140 TWResultCode TWGrabMutex(pthread_mutex_t * mutex)
142 int ret = pthread_mutex_lock(mutex);
149 OC_LOG(ERROR, TAG, "Mutex was not initialized.");
152 OC_LOG(INFO, TAG, "Timed out waiting for mutex. Non-error. Please try again.");
155 OC_LOG(ERROR, TAG, "Maximum number of locks for mutex have been exceeded.");
158 OC_LOG(ERROR, TAG, "Deadlock OR this thread already owns the mutex.");
161 return TW_RESULT_ERROR;
164 TWResultCode TWReleaseMutex(pthread_mutex_t * mutex)
166 int ret = pthread_mutex_unlock(mutex);
173 OC_LOG(ERROR, TAG, "Mutex was not initialized.");
176 OC_LOG(ERROR, TAG, "Maximum number of locks for mutex have been exceeded.");
179 OC_LOG(ERROR, TAG, "Cannot unlock because the current thread does not own the mutex.");
182 return TW_RESULT_ERROR;
185 char readBufferChar(int fd, ssize_t * readDataBytes)
187 // Performs read operation on fd for one character at a time.
195 *readDataBytes = read(fd, &byte, sizeof(byte));
196 if(*readDataBytes < 0)
198 OC_LOG_V(ERROR, TAG, "\tCould not read from port. Errno is: %d\n", errno);
204 bool isLineIgnored(const char * line, size_t length)
208 if(line[0] == 'N' && line[1] == 'A' && line[2] == 'C' && line[3] == 'K')
215 if(line[0] == 'S' && line[1] == 'E' && line[2] == 'Q')
219 else if(line[0] == 'A' && line[1] == 'C' && line[2] == 'K')
226 if(line[0] == 'A' && line[1] == 'T')
228 // If the line starts with "AT", then this is an echo. We ignore echos.
231 else if(line[0] == 'O' && line[1] == 'K')
233 //If this line is "OK", we ignore success codes. But we do end TWEntry's on "OK".
234 // (FYI, error codes are handled in readEntry() which invokes this function.)
241 const char * readBufferLine(int fd)
243 char bufferChar = '\0';
244 size_t bufferLoc = 0;
245 ssize_t readDataBytes = 0;
246 char * bufferLineHold = NULL;
247 char * bufferLine = NULL;
248 bool endOfLine1 = false;
249 bool endOfLine2 = false;
253 if(!endOfLine1 || !endOfLine2)
255 bufferChar = readBufferChar(fd, &readDataBytes);
256 if(bufferChar == '\r')
261 if(bufferChar == '\n')
267 if(readDataBytes != 0 && (!endOfLine1 && !endOfLine2))
269 size_t bufferLineSize = sizeof(bufferChar)*(bufferLoc+2);
270 bufferLine = (char *) OICRealloc(bufferLineHold, bufferLineSize);
273 OC_LOG(ERROR, TAG, "Ran out of memory.");
276 bufferLine[bufferLoc] = '\0';
277 OICStrcat(bufferLine, bufferLineSize+2, &bufferChar);
279 bufferLineHold = bufferLine;
281 OC_LOG_V(DEBUG, TAG, "Incoming: %s", bufferLine);
289 size_t bufferLineSize = sizeof(bufferChar)*(bufferLoc+2);
290 bufferLine = (char *) OICRealloc(bufferLineHold, bufferLineSize);
293 OC_LOG(ERROR, TAG, "Ran out of memory.");
296 bufferLine[bufferLoc] = '\0';
298 bufferLineHold = bufferLine;
299 ignore = isLineIgnored(bufferLine, strlen(bufferLine));
303 return readBufferLine(fd);
305 if(endOfLine1 && endOfLine2)
317 TWResultCode TWAddLineToEntry(const char * line, TWEntry * entry)
321 OC_LOG(ERROR, TAG, "Invalid/NULL parameter(s) received.");
322 return TW_RESULT_ERROR_INVALID_PARAMS;
324 TWLine * twLine = (TWLine *) OICCalloc(1, sizeof(TWLine));
327 OC_LOG(ERROR, TAG, "Ran out of memory.");
328 return TW_RESULT_ERROR_NO_MEMORY;
330 size_t lineLength = strlen(line);
332 twLine->length = lineLength;
333 size_t errorLength = strlen(TW_ENDCONTROL_ERROR_STRING);
334 size_t maxLength = (lineLength > errorLength) ? errorLength : lineLength;
336 if((errorLength == lineLength) &&
337 strncmp(line, TW_ENDCONTROL_ERROR_STRING, maxLength) == 0)
339 entry->atErrorCode[0] = line[errorLength];
340 entry->atErrorCode[1] = line[errorLength + 1];
344 entry->atErrorCode[0] = '0';
345 entry->atErrorCode[1] = '0';
348 // Null terminate the string.
349 entry->atErrorCode[2] = '\0';
353 entry->lines = twLine;
357 entry->lines[entry->count] = *twLine;
364 TWEntry * readEntry(int fd)
366 // Calls readBufferLine().
367 // Forms TWEntry from 1-n lines based on the response type.
369 TWEntry * entry = (TWEntry *) OICCalloc(1, sizeof(TWEntry));
372 OC_LOG(ERROR, TAG, "Ran out of memory.");
377 const char * bufferLine = NULL;
378 TWEntryTypePair entryTypePair = { .resultTxt = NULL,
380 .entryType = TW_NONE };
382 while(numLines == 0 || bufferLine)
386 bufferLine = readBufferLine(fd);
391 entryTypePair = getEntryTypePair(bufferLine);
395 bufferLine = readBufferLine(fd);
397 if(bufferLine != NULL)
399 entry->type = entryTypePair.entryType;
400 TWAddLineToEntry(bufferLine, entry);
402 entry->count = numLines;
405 if(entryTypePair.numLines != numLines)
407 entry->resultCode = TW_RESULT_ERROR_LINE_COUNT;
411 entry->resultCode = TW_RESULT_OK;
420 TWResultCode TWRetrieveEUI(PIPlugin * plugin, TWSock * twSock)
422 if(twSock->isActive == false)
424 return TW_RESULT_ERROR;
427 TWEntry * entry = NULL;
428 TWResultCode deleteResult = TW_RESULT_OK;
429 TWResultCode result = TWIssueATCommand(plugin, AT_CMD_GET_LOCAL_EUI);
430 if(result != TW_RESULT_OK)
434 result = TWGrabMutex(&twSock->mutex);
435 if(result != TW_RESULT_OK)
439 entry = readEntry(twSock->fd);
442 result = TWReleaseMutex(&twSock->mutex);
443 if(result != TW_RESULT_OK)
448 twSock->eui = (char *) OICMalloc(strlen(entry->lines[0].line)+1);
451 result = TW_RESULT_ERROR_NO_MEMORY;
455 if(SIZE_EUI != (strlen(entry->lines[0].line)+1))
457 OICFree(twSock->eui);
458 result = TW_RESULT_ERROR;
462 OICStrcpy(twSock->eui, SIZE_EUI, entry->lines[0].line);
464 result = TWReleaseMutex(&twSock->mutex);
465 if(result != TW_RESULT_OK)
467 OICFree(twSock->eui);
471 deleteResult = TWDeleteEntry(plugin, entry);
472 if(deleteResult != TW_RESULT_OK)
479 TWResultCode TWStartSock(PIPlugin * plugin, const char * fileLoc)
481 TWSock * sock = TWGetSock((PIPlugin *)plugin);
482 if(sock && sock->isActive == true)
484 // Ignore requests to start up the same socket.
489 sock = (TWSock *) OICCalloc(1, sizeof(TWSock));
492 return TW_RESULT_ERROR_NO_MEMORY;
495 TWResultCode ret = TWAddTWSock(sock, plugin, fileLoc);
501 ret = TWRetrieveEUI((PIPlugin *)plugin, sock);
502 if(ret != TW_RESULT_OK)
504 OC_LOG(ERROR, TAG, "Unable to retrieve Zigbee Radio's EUI.");
508 int result = pthread_create(&(sock->threadHandle),
514 OC_LOG_V(ERROR, TAG, "Thread start failed with error %d", result);
515 ret = TW_RESULT_ERROR;
522 TWDeleteTWSock(sock);
526 static void sigHandler(int signal)
528 pthread_t tid = pthread_self();
531 OC_LOG_V(INFO, TAG, "Received signal on thread: %lu\n", tid);
532 OC_LOG_V(INFO, TAG, "Signal is: %d", signal);
535 void * readForever(/*PIPlugin*/ void * plugin)
537 TWResultCode result = TW_RESULT_OK;
538 TWEntry * entry = NULL;
539 TWSock * twSock = TWGetSock((PIPlugin *)plugin);
542 OC_LOG(ERROR, TAG, "Unable to retrieve associated socket.");
546 pthread_t tid = pthread_self();
548 OC_LOG_V(INFO, TAG, "ReadForever Telegesis ThreadId: %lu", tid);
550 struct sigaction action = { .sa_handler = 0 };
554 action.sa_handler = sigHandler;
556 sigemptyset(&action.sa_mask);
557 sigaction(EINTR, &action, NULL);
561 FD_SET(twSock->fd, &readFDS);
562 bool haveMutex = false;
566 // 'sigmask' is needed to catch intterupts.
567 // This interrupt happens after call to pthread_exit(..., EINTR).
568 // Once a signal handler is registered, pselect will handle interrupts by returning
569 // with '-1' and setting errno appropriately.
570 int ret = pselect(twSock->fd+1, &readFDS, NULL, NULL, NULL, &sigmask);
578 // This EINTR signal is not for us. Do not handle it.
580 OC_LOG(DEBUG, TAG, "Thread has been joined. Exiting thread.");
581 pthread_exit(PTHREAD_CANCELED);
586 OC_LOG_V(ERROR, TAG, "Unaccounted error occurred. Exiting thread."
587 "Errno is: %d", errno);
593 ret = FD_ISSET(twSock->fd, &readFDS);
596 // Valid data on valid socket.
597 // Grab & parse, then pass up to upper layers.
598 if(haveMutex != true)
600 result = TWGrabMutex(&twSock->mutex);
601 if(result != TW_RESULT_OK)
603 OC_LOG(ERROR, TAG, "Unable to grab mutex.");
608 entry = readEntry(twSock->fd);
611 result = TWReleaseMutex(&twSock->mutex);
612 if(result != TW_RESULT_OK)
614 OC_LOG(ERROR, TAG, "Unable to release mutex.");
618 // This is most likely a parsing error of the received
619 // response. Not necessarily fatal.
622 result = TWEnqueueEntry((PIPlugin *)plugin, entry);
623 if(result != TW_RESULT_OK)
625 OC_LOG_V(ERROR, TAG, "Could not add TWEntry to queue for"
626 "consumption by the application"
627 "layer. Error is: %d", result);
628 TWDeleteEntry((PIPlugin *)plugin, entry);
629 // This is most likely a FATAL error, such as out of memory.
633 // Notify other threads waiting for a response that an entry has been enqueued.
634 pthread_cond_signal(&twSock->queueCV);
636 result = TWReleaseMutex(&twSock->mutex);
641 // Unrelated data waiting elsewhere. Continue the loop.
650 TWResultCode TWIssueATCommand(PIPlugin * plugin, const char * command)
652 if(!plugin || !command)
654 return TW_RESULT_ERROR_INVALID_PARAMS;
656 OC_LOG_V(INFO, TAG, "Attempt to write %s.", command);
657 TWSock * twSock = TWGetSock(plugin);
660 return TW_RESULT_ERROR;
663 if(twSock->isActive == false)
665 return TW_RESULT_ERROR;
668 TWResultCode result = TW_RESULT_OK;
669 TWResultCode mutexResult = TW_RESULT_OK;
670 mutexResult = TWGrabMutex(&twSock->mutex);
671 if(mutexResult != TW_RESULT_OK)
675 size_t sendCommandSize = (strlen(command) + strlen("\r") + 1) * sizeof(char);
676 char * sendCommand = (char *) OICCalloc(1, sendCommandSize);
679 return TW_RESULT_ERROR_NO_MEMORY;
681 char * temp = OICStrcpy(sendCommand, sendCommandSize, command);
682 if(temp != sendCommand)
684 result = TW_RESULT_ERROR;
687 temp = OICStrcat(sendCommand, sendCommandSize, "\r");
688 if(temp != sendCommand)
690 result = TW_RESULT_ERROR;
693 size_t expectedWrittenBytes = strlen(sendCommand);
695 size_t actualWrittenBytes = write(twSock->fd, sendCommand, expectedWrittenBytes);
696 if(actualWrittenBytes <= 0)
698 OC_LOG_V(ERROR, TAG, "Could not write to port. Errno is: %d\n", errno);
699 result = TW_RESULT_ERROR;
702 if(actualWrittenBytes != expectedWrittenBytes)
704 OC_LOG(ERROR, TAG, "Telegesis Adapter did not receive expected command. Unknown state.");
705 result = TW_RESULT_ERROR;
709 mutexResult = TWReleaseMutex(&twSock->mutex);
710 if(mutexResult != TW_RESULT_OK)
714 OICFree(sendCommand);
718 TWResultCode TWEnqueueEntry(PIPlugin * plugin, TWEntry * entry)
720 if(!plugin || !entry)
722 return TW_RESULT_ERROR_INVALID_PARAMS;
724 TWSock * twSock = TWGetSock(plugin);
727 return TW_RESULT_ERROR;
730 if(twSock->isActive == false)
732 return TW_RESULT_ERROR;
734 LL_APPEND(twSock->queue, entry);
738 TWResultCode TWDequeueEntry(PIPlugin * plugin, TWEntry ** entry, TWEntryType type)
740 if(!plugin || !entry)
742 return TW_RESULT_ERROR_INVALID_PARAMS;
744 TWSock * twSock = TWGetSock(plugin);
747 return TW_RESULT_ERROR;
750 if(twSock->isActive == false)
752 return TW_RESULT_ERROR;
755 TWResultCode ret = TW_RESULT_OK;
757 // If no entry is found, then this code path returns immediately.
758 ret = TWGrabMutex(&twSock->mutex);
759 if(ret != TW_RESULT_OK)
766 // Wait for up to 10 seconds for the entry to put into the queue.
767 ret = TWWait(&twSock->queueCV, &twSock->mutex, TIME_OUT_10_SECONDS);
768 if(ret != TW_RESULT_OK)
774 *entry = twSock->queue;
777 LL_DELETE(twSock->queue, *entry);
779 ret = TWReleaseMutex(&twSock->mutex);
780 if(ret != TW_RESULT_OK)
787 TWResultCode TWFreeQueue(PIPlugin * plugin)
791 return TW_RESULT_ERROR_INVALID_PARAMS;
793 TWResultCode ret = TW_RESULT_OK;
794 TWEntry * entry = NULL;
797 ret = TWDequeueEntry(plugin, &entry, TW_NONE);
798 if(ret != TW_RESULT_OK)
806 ret = TWDeleteEntry(plugin, entry);
807 if(ret != TW_RESULT_OK)
815 TWResultCode TWDeleteEntry(PIPlugin * plugin, TWEntry * entry)
817 if(!plugin || !entry)
819 return TW_RESULT_ERROR_INVALID_PARAMS;
822 TWSock * twSock = TWGetSock(plugin);
825 return TW_RESULT_ERROR;
828 if(twSock->isActive == false)
830 return TW_RESULT_ERROR;
833 TWResultCode ret = TWGrabMutex(&twSock->mutex);
834 if(ret != TW_RESULT_OK)
838 TWEntry * out = NULL;
839 TWEntry * tmp = NULL;
840 LL_FOREACH_SAFE(twSock->queue, out, tmp)
844 OC_LOG(ERROR, TAG, "Tried to delete an entry that is still in the queue. \
845 Please dequeue the entry first.");
846 return TW_RESULT_ERROR;
849 ret = TWReleaseMutex(&twSock->mutex);
856 TWResultCode TWGetEUI(PIPlugin * plugin, char ** eui)
860 return TW_RESULT_ERROR_INVALID_PARAMS;
862 TWSock * twSock = TWGetSock(plugin);
865 return TW_RESULT_ERROR;
868 if(twSock->isActive == false)
870 return TW_RESULT_ERROR;
873 *eui = OICStrdup(twSock->eui);
879 TWResultCode TWStopSock(PIPlugin * plugin)
883 return TW_RESULT_ERROR_INVALID_PARAMS;
885 TWSock * twSock = TWGetSock(plugin);
888 return TW_RESULT_ERROR;
891 if(twSock->isActive == false)
893 return TW_RESULT_ERROR;
896 TWResultCode ret = TWFreeQueue(plugin);
897 if(ret != TW_RESULT_OK)
902 twSock->isActive = false;
904 void * retVal = NULL;
905 int pthreadRet = pthread_kill(twSock->threadHandle, EINTR);
908 return TW_RESULT_ERROR;
911 pthreadRet = pthread_join(twSock->threadHandle, &retVal);
917 OC_LOG(ERROR, TAG, "A deadlock has occurred.");
920 OC_LOG(ERROR, TAG, "Thread is not joinable or another thread has already joined.");
923 OC_LOG(ERROR, TAG, "No thread with the ID could be found.");
926 OC_LOG_V(ERROR, TAG, "Unknown error occurred when joining thread: %d", pthreadRet);
928 return TW_RESULT_ERROR;
930 if(retVal != PTHREAD_CANCELED)
932 return TW_RESULT_ERROR;
934 ret = TWDeleteTWSock(twSock);
935 if(ret != TW_RESULT_OK)