Initial prototype for optimized Telegesis read/write layer.
[contrib/iotivity.git] / plugins / zigbee_wrapper / telegesis_wrapper / src / telegesis_socket.c
1 //******************************************************************
2 //
3 // Copyright 2015 Intel Mobile Communications GmbH All Rights Reserved.
4 //
5 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //      http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 //
19 //-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
20
21 #include "telegesis_socket.h"
22
23 #include <unistd.h>
24 #include <string.h>
25 #include <errno.h>
26 #include <signal.h>
27 #include <sys/select.h>
28 #include <time.h>
29
30 #include "twsocketlist.h"
31 #include "oic_string.h"
32 #include "oic_malloc.h"
33 #include "logger.h"
34
35 #define TAG "Telegesis_Socket"
36
37 /**
38  * New thread's main() ftn.
39  */
40 void * readForever(/*PIPlugin */void * plugin);
41 /**
42  * Just grabs the next char in the buffer. Called by readBufferLine() multiple times.
43  */
44 char readBufferChar(int fd, ssize_t * readDataBytes);
45 /**
46  * Calls readBufferChar() until line is formed.
47  */
48 const char * readBufferLine(int fd);
49 /**
50  * Calls readBufferLine() until a full TWEntry is formed.
51  */
52 TWEntry * readEntry(int fd);
53 /**
54  * Posts the TWEntry to the queue.
55  */
56 TWResultCode TWEnqueueEntry(PIPlugin * plugin, TWEntry * entry);
57
58 /**
59  * Defines the mapping relationships between Telegesis AT response/prompt tags and
60  * how many lines we should expect with each following the tag.
61  */
62 typedef struct
63 {
64     const char *resultTxt;
65     uint8_t numLines;
66     TWEntryType entryType;
67 } TWEntryTypePair;
68
69 static TWEntryTypePair TWEntryTypePairArray[] =
70 {
71     {"+N=",             1, TW_NETWORK_INFO},
72     {"JPAN:",           1, TW_JPAN},
73     {"RFD:",            1, TW_RFD},
74     {"FFD:",            1, TW_FFD},
75     {"SED:",            1, TW_SED},
76     {"ZED:",            1, TW_ZED},
77     {"MatchDesc:",      1, TW_MATCHDESC},
78     {"SimpleDesc:",     6, TW_SIMPLEDESC},
79     {"InCluster:",      1, TW_INCLUSTER},
80     {"WRITEATTR:",      1, TW_WRITEATTR},
81     {"RESPATTR:",       1, TW_RESPATTR},
82     {"TEMPERATURE:",    1, TW_RESPATTR},
83     {"DFTREP",          1, TW_DFTREP},
84     {"DRLOCRSP:",       1, TW_DRLOCRSP},
85     {"DRUNLOCKRSP:",    1, TW_DRUNLOCKRSP},
86     {"ACK:",            1, TW_ACK},
87     {"NACK:",           1, TW_NACK},
88     {"SEQ:",            1, TW_SEQ},
89     {"ZENROLLREQ:",     1, TW_ZENROLLREQ},
90     {"ENROLLED:",       1, TW_ENROLLED},
91     {"Unknown:",        0, TW_NONE},
92     {"Unknown:",        1, TW_MAX_ENTRY}
93 };
94
95 TWEntry * TWEntryList = NULL;
96
97 TWEntryTypePair getEntryTypePair(const char * bufferLine)
98 {
99     size_t bufferLength = strlen(bufferLine);
100     for(uint8_t i = 0; i < TW_MAX_ENTRY; i++)
101     {
102         size_t resultTxtLength = strlen(TWEntryTypePairArray[i].resultTxt);
103         if((bufferLength >= resultTxtLength) &&
104            strncmp(bufferLine, TWEntryTypePairArray[i].resultTxt, resultTxtLength) == 0)
105         {
106             return TWEntryTypePairArray[i];
107         }
108     }
109     return TWEntryTypePairArray[TW_MAX_ENTRY];
110 }
111
112 TWResultCode TWWait(pthread_cond_t * cond, pthread_mutex_t * mutex, uint8_t timeout)
113 {
114     int ret = 0;
115     // This is a blocking call which hold the calling thread until an entry
116     // has been enqueued or until the specified timeout has surpassed.
117     struct timespec abs_time;
118     clock_gettime(CLOCK_REALTIME , &abs_time);
119     abs_time.tv_sec += timeout;
120     ret = pthread_cond_timedwait(cond, mutex, &abs_time);
121
122     switch (ret)
123     {
124         case 0:
125             return TW_RESULT_OK;
126         case ETIMEDOUT:
127             OC_LOG(INFO, TAG, "Timed out waiting for CV. Non-error. Please try again.");
128             return TW_RESULT_OK;
129         case EINVAL:
130             OC_LOG(ERROR, TAG, "Cond or Mutex is invalid. OR timeout value for CV is invalid.");
131             break;
132         case EPERM:
133             OC_LOG(ERROR, TAG, "Cannot use CV because the current thread does not own the CV.");
134             break;
135     }
136
137     return TW_RESULT_ERROR;
138 }
139
140 TWResultCode TWGrabMutex(pthread_mutex_t * mutex)
141 {
142     int ret = pthread_mutex_lock(mutex);
143
144     switch (ret)
145     {
146         case 0:
147             return TW_RESULT_OK;
148         case EINVAL:
149             OC_LOG(ERROR, TAG, "Mutex was not initialized.");
150             break;
151         case ETIMEDOUT:
152             OC_LOG(INFO, TAG, "Timed out waiting for mutex. Non-error. Please try again.");
153             return TW_RESULT_OK;
154         case EAGAIN:
155             OC_LOG(ERROR, TAG, "Maximum number of locks for mutex have been exceeded.");
156             break;
157         case EDEADLK:
158             OC_LOG(ERROR, TAG, "Deadlock OR this thread already owns the mutex.");
159             break;
160     }
161     return TW_RESULT_ERROR;
162 }
163
164 TWResultCode TWReleaseMutex(pthread_mutex_t * mutex)
165 {
166     int ret = pthread_mutex_unlock(mutex);
167
168     switch (ret)
169     {
170         case 0:
171             return TW_RESULT_OK;
172         case EINVAL:
173             OC_LOG(ERROR, TAG, "Mutex was not initialized.");
174             break;
175         case EAGAIN:
176             OC_LOG(ERROR, TAG, "Maximum number of locks for mutex have been exceeded.");
177             break;
178         case EPERM:
179             OC_LOG(ERROR, TAG, "Cannot unlock because the current thread does not own the mutex.");
180             break;
181     }
182     return TW_RESULT_ERROR;
183 }
184
185 char readBufferChar(int fd, ssize_t * readDataBytes)
186 {
187     // Performs read operation on fd for one character at a time.
188     if(!readDataBytes)
189     {
190         return '\0';
191     }
192     *readDataBytes = 0;
193     char byte = '\0';
194     errno = 0;
195     *readDataBytes = read(fd, &byte, sizeof(byte));
196     if(*readDataBytes < 0)
197     {
198         OC_LOG_V(ERROR, TAG, "\tCould not read from port. Errno is: %d\n", errno);
199         return '\0';
200     }
201     return byte;
202 }
203
204 bool isLineIgnored(const char * line, size_t length)
205 {
206     if(length >= 4)
207     {
208         if(line[0] == 'N' && line[1] == 'A' && line[2] == 'C' && line[3] == 'K')
209         {
210             return true;
211         }
212     }
213     if(length >= 3)
214     {
215         if(line[0] == 'S' && line[1] == 'E' && line[2] == 'Q')
216         {
217             return true;
218         }
219         else if(line[0] == 'A' && line[1] == 'C' && line[2] == 'K')
220         {
221             return true;
222         }
223     }
224     if(length >= 2)
225     {
226         if(line[0] == 'A' && line[1] == 'T')
227         {
228             // If the line starts with "AT", then this is an echo. We ignore echos.
229             return true;
230         }
231         else if(line[0] == 'O' && line[1] == 'K')
232         {
233             //If this line is "OK", we ignore success codes. But we do end TWEntry's on "OK".
234             // (FYI, error codes are handled in readEntry() which invokes this function.)
235             return true;
236         }
237     }
238     return false;
239 }
240
241 const char * readBufferLine(int fd)
242 {
243     char bufferChar = '\0';
244     size_t bufferLoc = 0;
245     ssize_t readDataBytes = 0;
246     char * bufferLineHold = NULL;
247     char * bufferLine = NULL;
248     bool endOfLine1 = false;
249     bool endOfLine2 = false;
250     bool ignore = false;
251     while(true)
252     {
253         if(!endOfLine1 || !endOfLine2)
254         {
255             bufferChar = readBufferChar(fd, &readDataBytes);
256             if(bufferChar == '\r')
257             {
258                 endOfLine1 = true;
259                 continue;
260             }
261             if(bufferChar == '\n')
262             {
263                 endOfLine2 = true;
264                 continue;
265             }
266         }
267         if(readDataBytes != 0 && (!endOfLine1 && !endOfLine2))
268         {
269             size_t bufferLineSize = sizeof(bufferChar)*(bufferLoc+2);
270             bufferLine = (char *) OICRealloc(bufferLineHold, bufferLineSize);
271             if(!bufferLine)
272             {
273                 OC_LOG(ERROR, TAG, "Ran out of memory.");
274                 return NULL;
275             }
276             bufferLine[bufferLoc] = '\0';
277             OICStrcat(bufferLine, bufferLineSize+2, &bufferChar);
278             bufferLoc++;
279             bufferLineHold = bufferLine;
280
281             OC_LOG_V(DEBUG, TAG, "Incoming: %s", bufferLine);
282         }
283         else
284         {
285             if(!bufferLine)
286             {
287                 return NULL;
288             }
289             size_t bufferLineSize = sizeof(bufferChar)*(bufferLoc+2);
290             bufferLine = (char *) OICRealloc(bufferLineHold, bufferLineSize);
291             if(!bufferLine)
292             {
293                 OC_LOG(ERROR, TAG, "Ran out of memory.");
294                 return NULL;
295             }
296             bufferLine[bufferLoc] = '\0';
297             bufferLoc++;
298             bufferLineHold = bufferLine;
299             ignore = isLineIgnored(bufferLine, strlen(bufferLine));
300             if(ignore)
301             {
302                 OICFree(bufferLine);
303                 return readBufferLine(fd);
304             }
305             if(endOfLine1 && endOfLine2)
306             {
307                 return bufferLine;
308             }
309             else
310             {
311                 return NULL;
312             }
313         }
314     }
315 }
316
317 TWResultCode TWAddLineToEntry(const char * line, TWEntry * entry)
318 {
319     if(!line || !entry)
320     {
321         OC_LOG(ERROR, TAG, "Invalid/NULL parameter(s) received.");
322         return TW_RESULT_ERROR_INVALID_PARAMS;
323     }
324     TWLine * twLine = (TWLine *) OICCalloc(1, sizeof(TWLine));
325     if(!twLine)
326     {
327         OC_LOG(ERROR, TAG, "Ran out of memory.");
328         return TW_RESULT_ERROR_NO_MEMORY;
329     }
330     size_t lineLength = strlen(line);
331     twLine->line = line;
332     twLine->length = lineLength;
333     size_t errorLength = strlen(TW_ENDCONTROL_ERROR_STRING);
334     size_t maxLength = (lineLength > errorLength) ? errorLength : lineLength;
335
336     if((errorLength == lineLength) &&
337        strncmp(line, TW_ENDCONTROL_ERROR_STRING, maxLength) == 0)
338     {
339         entry->atErrorCode[0] = line[errorLength];
340         entry->atErrorCode[1] = line[errorLength + 1];
341     }
342     else
343     {
344         entry->atErrorCode[0] = '0';
345         entry->atErrorCode[1] = '0';
346     }
347
348     // Null terminate the string.
349     entry->atErrorCode[2] = '\0';
350
351     if(!entry->lines)
352     {
353         entry->lines = twLine;
354     }
355     else
356     {
357         entry->lines[entry->count] = *twLine;
358     }
359     entry->count++;
360
361     return TW_RESULT_OK;
362 }
363
364 TWEntry * readEntry(int fd)
365 {
366     // Calls readBufferLine().
367     // Forms TWEntry from 1-n lines based on the response type.
368
369     TWEntry * entry = (TWEntry *) OICCalloc(1, sizeof(TWEntry));
370     if(!entry)
371     {
372         OC_LOG(ERROR, TAG, "Ran out of memory.");
373         return NULL;
374     }
375     entry->count = 0;
376
377     const char * bufferLine = NULL;
378     TWEntryTypePair entryTypePair = { .resultTxt = NULL,
379                                       .numLines  = 0,
380                                       .entryType = TW_NONE };
381     size_t numLines = 0;
382     while(numLines == 0 || bufferLine)
383     {
384         if(numLines == 0)
385         {
386             bufferLine = readBufferLine(fd);
387             if(!bufferLine)
388             {
389                 goto exit;
390             }
391             entryTypePair = getEntryTypePair(bufferLine);
392         }
393         else
394         {
395             bufferLine = readBufferLine(fd);
396         }
397         if(bufferLine != NULL)
398         {
399             entry->type = entryTypePair.entryType;
400             TWAddLineToEntry(bufferLine, entry);
401             numLines++;
402             entry->count = numLines;
403         }
404
405         if(entryTypePair.numLines != numLines)
406         {
407             entry->resultCode = TW_RESULT_ERROR_LINE_COUNT;
408         }
409         else
410         {
411             entry->resultCode = TW_RESULT_OK;
412         }
413     }
414     return entry;
415 exit:
416     OICFree(entry);
417     return NULL;
418 }
419
420 TWResultCode TWRetrieveEUI(PIPlugin * plugin, TWSock * twSock)
421 {
422     if(twSock->isActive == false)
423     {
424         return TW_RESULT_ERROR;
425     }
426
427     TWEntry * entry = NULL;
428     TWResultCode deleteResult = TW_RESULT_OK;
429     TWResultCode result = TWIssueATCommand(plugin, AT_CMD_GET_LOCAL_EUI);
430     if(result != TW_RESULT_OK)
431     {
432         return result;
433     }
434     result = TWGrabMutex(&twSock->mutex);
435     if(result != TW_RESULT_OK)
436     {
437         return result;
438     }
439     entry = readEntry(twSock->fd);
440     if(!entry)
441     {
442         result = TWReleaseMutex(&twSock->mutex);
443         if(result != TW_RESULT_OK)
444         {
445             goto exit;
446         }
447     }
448     twSock->eui = (char *) OICMalloc(strlen(entry->lines[0].line)+1);
449     if(!twSock->eui)
450     {
451         result = TW_RESULT_ERROR_NO_MEMORY;
452         goto exit;
453     }
454
455     if(SIZE_EUI != (strlen(entry->lines[0].line)+1))
456     {
457         OICFree(twSock->eui);
458         result = TW_RESULT_ERROR;
459         goto exit;
460     }
461
462     OICStrcpy(twSock->eui, SIZE_EUI, entry->lines[0].line);
463
464     result = TWReleaseMutex(&twSock->mutex);
465     if(result != TW_RESULT_OK)
466     {
467         OICFree(twSock->eui);
468         goto exit;
469     }
470 exit:
471     deleteResult = TWDeleteEntry(plugin, entry);
472     if(deleteResult != TW_RESULT_OK)
473     {
474         return deleteResult;
475     }
476     return result;
477 }
478
479 TWResultCode TWStartSock(PIPlugin * plugin, const char * fileLoc)
480 {
481     TWSock * sock = TWGetSock((PIPlugin *)plugin);
482     if(sock && sock->isActive == true)
483     {
484         // Ignore requests to start up the same socket.
485         return TW_RESULT_OK;
486     }
487     if(!sock)
488     {
489         sock = (TWSock *) OICCalloc(1, sizeof(TWSock));
490         if(!sock)
491         {
492             return TW_RESULT_ERROR_NO_MEMORY;
493         }
494     }
495     TWResultCode ret = TWAddTWSock(sock, plugin, fileLoc);
496     if(ret != 0)
497     {
498         goto exit;
499     }
500
501     ret = TWRetrieveEUI((PIPlugin *)plugin, sock);
502     if(ret != TW_RESULT_OK)
503     {
504         OC_LOG(ERROR, TAG, "Unable to retrieve Zigbee Radio's EUI.");
505         return ret;
506     }
507
508     int result = pthread_create(&(sock->threadHandle),
509                                 NULL,
510                                 readForever,
511                                 (void *) plugin);
512     if(result != 0)
513     {
514         OC_LOG_V(ERROR, TAG, "Thread start failed with error %d", result);
515         ret = TW_RESULT_ERROR;
516         goto exit;
517     }
518
519     return TW_RESULT_OK;
520
521 exit:
522     TWDeleteTWSock(sock);
523     return ret;
524 }
525
526 static void sigHandler(int signal)
527 {
528     pthread_t tid = pthread_self();
529     (void)tid;
530     (void)signal;
531     OC_LOG_V(INFO, TAG, "Received signal on thread: %lu\n", tid);
532     OC_LOG_V(INFO, TAG, "Signal is: %d", signal);
533 }
534
535 void * readForever(/*PIPlugin*/ void * plugin)
536 {
537     TWResultCode result = TW_RESULT_OK;
538     TWEntry * entry = NULL;
539     TWSock * twSock = TWGetSock((PIPlugin *)plugin);
540     if(!twSock)
541     {
542         OC_LOG(ERROR, TAG, "Unable to retrieve associated socket.");
543         return NULL;
544     }
545
546     pthread_t tid = pthread_self();
547     (void)tid;
548     OC_LOG_V(INFO, TAG, "ReadForever Telegesis ThreadId: %lu", tid);
549
550     struct sigaction action = { .sa_handler = 0 };
551
552     sigset_t sigmask;
553
554     action.sa_handler = sigHandler;
555     action.sa_flags = 0;
556     sigemptyset(&action.sa_mask);
557     sigaction(EINTR, &action, NULL);
558
559     fd_set readFDS;
560     FD_ZERO(&readFDS);
561     FD_SET(twSock->fd, &readFDS);
562     bool haveMutex = false;
563     while(true)
564     {
565         errno = 0;
566         // 'sigmask' is needed to catch intterupts.
567         // This interrupt happens after call to pthread_exit(..., EINTR).
568         // Once a signal handler is registered, pselect will handle interrupts by returning
569         // with '-1' and setting errno appropriately.
570         int ret = pselect(twSock->fd+1, &readFDS, NULL, NULL, NULL, &sigmask);
571         if(ret < 0)
572         {
573             if(errno == EINTR)
574             {
575                 if(twSock->isActive)
576                 {
577                     continue;
578                     // This EINTR signal is not for us. Do not handle it.
579                 }
580                 OC_LOG(DEBUG, TAG, "Thread has been joined. Exiting thread.");
581                 pthread_exit(PTHREAD_CANCELED);
582                 return NULL;
583             }
584             else
585             {
586                 OC_LOG_V(ERROR, TAG, "Unaccounted error occurred. Exiting thread."
587                                      "Errno is: %d", errno);
588                 return NULL;
589             }
590         }
591         else
592         {
593             ret = FD_ISSET(twSock->fd, &readFDS);
594             if(ret != 0)
595             {
596                 // Valid data on valid socket.
597                 // Grab & parse, then pass up to upper layers.
598                 if(haveMutex != true)
599                 {
600                     result = TWGrabMutex(&twSock->mutex);
601                     if(result != TW_RESULT_OK)
602                     {
603                         OC_LOG(ERROR, TAG, "Unable to grab mutex.");
604                         return NULL;
605                     }
606                     haveMutex = true;
607                 }
608                 entry = readEntry(twSock->fd);
609                 if(!entry)
610                 {
611                     result = TWReleaseMutex(&twSock->mutex);
612                     if(result != TW_RESULT_OK)
613                     {
614                         OC_LOG(ERROR, TAG, "Unable to release mutex.");
615                         return NULL;
616                     }
617                     haveMutex = false;
618                     // This is most likely a parsing error of the received
619                     // response. Not necessarily fatal.
620                     continue;
621                 }
622                 result = TWEnqueueEntry((PIPlugin *)plugin, entry);
623                 if(result != TW_RESULT_OK)
624                 {
625                     OC_LOG_V(ERROR, TAG, "Could not add TWEntry to queue for"
626                                           "consumption by the application"
627                                           "layer. Error is: %d", result);
628                     TWDeleteEntry((PIPlugin *)plugin, entry);
629                     // This is most likely a FATAL error, such as out of memory.
630                     break;
631                 }
632
633                 // Notify other threads waiting for a response that an entry has been enqueued.
634                 pthread_cond_signal(&twSock->queueCV);
635
636                 result = TWReleaseMutex(&twSock->mutex);
637                 haveMutex = false;
638             }
639             else
640             {
641                 // Unrelated data waiting elsewhere. Continue the loop.
642                 continue;
643             }
644         }
645     }
646
647     return NULL;
648 }
649
650 TWResultCode TWIssueATCommand(PIPlugin * plugin, const char * command)
651 {
652     if(!plugin || !command)
653     {
654         return TW_RESULT_ERROR_INVALID_PARAMS;
655     }
656     OC_LOG_V(INFO, TAG, "Attempt to write %s.", command);
657     TWSock * twSock = TWGetSock(plugin);
658     if(!twSock)
659     {
660         return TW_RESULT_ERROR;
661     }
662
663     if(twSock->isActive == false)
664     {
665         return TW_RESULT_ERROR;
666     }
667
668     TWResultCode result = TW_RESULT_OK;
669     TWResultCode mutexResult = TW_RESULT_OK;
670     mutexResult = TWGrabMutex(&twSock->mutex);
671     if(mutexResult != TW_RESULT_OK)
672     {
673         return mutexResult;
674     }
675     size_t sendCommandSize = (strlen(command) + strlen("\r") + 1) * sizeof(char);
676     char * sendCommand = (char *) OICCalloc(1, sendCommandSize);
677     if(!sendCommand)
678     {
679         return TW_RESULT_ERROR_NO_MEMORY;
680     }
681     char * temp = OICStrcpy(sendCommand, sendCommandSize, command);
682     if(temp != sendCommand)
683     {
684         result = TW_RESULT_ERROR;
685         goto exit;
686     }
687     temp = OICStrcat(sendCommand, sendCommandSize, "\r");
688     if(temp != sendCommand)
689     {
690         result = TW_RESULT_ERROR;
691         goto exit;
692     }
693     size_t expectedWrittenBytes = strlen(sendCommand);
694     errno = 0;
695     size_t actualWrittenBytes = write(twSock->fd, sendCommand, expectedWrittenBytes);
696     if(actualWrittenBytes <= 0)
697     {
698         OC_LOG_V(ERROR, TAG, "Could not write to port. Errno is: %d\n", errno);
699         result =  TW_RESULT_ERROR;
700         goto exit;
701     }
702     if(actualWrittenBytes != expectedWrittenBytes)
703     {
704         OC_LOG(ERROR, TAG, "Telegesis Adapter did not receive expected command. Unknown state.");
705         result = TW_RESULT_ERROR;
706     }
707
708 exit:
709     mutexResult = TWReleaseMutex(&twSock->mutex);
710     if(mutexResult != TW_RESULT_OK)
711     {
712         return mutexResult;
713     }
714     OICFree(sendCommand);
715     return result;
716 }
717
718 TWResultCode TWEnqueueEntry(PIPlugin * plugin, TWEntry * entry)
719 {
720     if(!plugin || !entry)
721     {
722         return TW_RESULT_ERROR_INVALID_PARAMS;
723     }
724     TWSock * twSock = TWGetSock(plugin);
725     if(!twSock)
726     {
727         return TW_RESULT_ERROR;
728     }
729
730     if(twSock->isActive == false)
731     {
732         return TW_RESULT_ERROR;
733     }
734     LL_APPEND(twSock->queue, entry);
735     return TW_RESULT_OK;
736 }
737
738 TWResultCode TWDequeueEntry(PIPlugin * plugin, TWEntry ** entry, TWEntryType type)
739 {
740     if(!plugin || !entry)
741     {
742         return TW_RESULT_ERROR_INVALID_PARAMS;
743     }
744     TWSock * twSock = TWGetSock(plugin);
745     if(!twSock)
746     {
747         return TW_RESULT_ERROR;
748     }
749
750     if(twSock->isActive == false)
751     {
752         return TW_RESULT_ERROR;
753     }
754
755     TWResultCode ret = TW_RESULT_OK;
756
757     // If no entry is found, then this code path returns immediately.
758     ret = TWGrabMutex(&twSock->mutex);
759     if(ret != TW_RESULT_OK)
760     {
761         return ret;
762     }
763
764     if(type != TW_NONE)
765     {
766         // Wait for up to 10 seconds for the entry to put into the queue.
767         ret = TWWait(&twSock->queueCV, &twSock->mutex, TIME_OUT_10_SECONDS);
768         if(ret != TW_RESULT_OK)
769         {
770             return ret;
771         }
772     }
773
774     *entry = twSock->queue;
775     if(*entry)
776     {
777         LL_DELETE(twSock->queue, *entry);
778     }
779     ret = TWReleaseMutex(&twSock->mutex);
780     if(ret != TW_RESULT_OK)
781     {
782         return ret;
783     }
784     return ret;
785 }
786
787 TWResultCode TWFreeQueue(PIPlugin * plugin)
788 {
789     if(!plugin)
790     {
791         return TW_RESULT_ERROR_INVALID_PARAMS;
792     }
793     TWResultCode ret = TW_RESULT_OK;
794     TWEntry * entry = NULL;
795     while(true)
796     {
797         ret = TWDequeueEntry(plugin, &entry, TW_NONE);
798         if(ret != TW_RESULT_OK)
799         {
800             return ret;
801         }
802         if(entry == NULL)
803         {
804             break;
805         }
806         ret = TWDeleteEntry(plugin, entry);
807         if(ret != TW_RESULT_OK)
808         {
809             return ret;
810         }
811     }
812     return ret;
813 }
814
815 TWResultCode TWDeleteEntry(PIPlugin * plugin, TWEntry * entry)
816 {
817     if(!plugin || !entry)
818     {
819         return TW_RESULT_ERROR_INVALID_PARAMS;
820     }
821
822     TWSock * twSock = TWGetSock(plugin);
823     if(!twSock)
824     {
825         return TW_RESULT_ERROR;
826     }
827
828     if(twSock->isActive == false)
829     {
830         return TW_RESULT_ERROR;
831     }
832
833     TWResultCode ret = TWGrabMutex(&twSock->mutex);
834     if(ret != TW_RESULT_OK)
835     {
836         return ret;
837     }
838     TWEntry * out = NULL;
839     TWEntry * tmp = NULL;
840     LL_FOREACH_SAFE(twSock->queue, out, tmp)
841     {
842         if(out == entry)
843         {
844             OC_LOG(ERROR, TAG, "Tried to delete an entry that is still in the queue. \
845                                 Please dequeue the entry first.");
846             return TW_RESULT_ERROR;
847         }
848     }
849     ret = TWReleaseMutex(&twSock->mutex);
850
851     OICFree(entry);
852
853     return TW_RESULT_OK;
854 }
855
856 TWResultCode TWGetEUI(PIPlugin * plugin, char ** eui)
857 {
858     if(!plugin || !eui)
859     {
860         return TW_RESULT_ERROR_INVALID_PARAMS;
861     }
862     TWSock * twSock = TWGetSock(plugin);
863     if(!twSock)
864     {
865         return TW_RESULT_ERROR;
866     }
867
868     if(twSock->isActive == false)
869     {
870         return TW_RESULT_ERROR;
871     }
872
873     *eui = OICStrdup(twSock->eui);
874
875     return TW_RESULT_OK;
876
877 }
878
879 TWResultCode TWStopSock(PIPlugin * plugin)
880 {
881     if(!plugin)
882     {
883         return TW_RESULT_ERROR_INVALID_PARAMS;
884     }
885     TWSock * twSock = TWGetSock(plugin);
886     if(!twSock)
887     {
888         return TW_RESULT_ERROR;
889     }
890
891     if(twSock->isActive == false)
892     {
893         return TW_RESULT_ERROR;
894     }
895
896     TWResultCode ret = TWFreeQueue(plugin);
897     if(ret != TW_RESULT_OK)
898     {
899         return ret;
900     }
901
902     twSock->isActive = false;
903
904     void * retVal = NULL;
905     int pthreadRet = pthread_kill(twSock->threadHandle, EINTR);
906     if(pthreadRet != 0)
907     {
908         return TW_RESULT_ERROR;
909     }
910
911     pthreadRet = pthread_join(twSock->threadHandle, &retVal);
912     if(pthreadRet != 0)
913     {
914         switch(pthreadRet)
915         {
916             case EDEADLK:
917                 OC_LOG(ERROR, TAG, "A deadlock has occurred.");
918                 break;
919             case EINVAL:
920                 OC_LOG(ERROR, TAG, "Thread is not joinable or another thread has already joined.");
921                 break;
922             case ESRCH:
923                 OC_LOG(ERROR, TAG, "No thread with the ID could be found.");
924                 break;
925             default:
926                 OC_LOG_V(ERROR, TAG, "Unknown error occurred when joining thread: %d", pthreadRet);
927         }
928         return TW_RESULT_ERROR;
929     }
930     if(retVal != PTHREAD_CANCELED)
931     {
932         return TW_RESULT_ERROR;
933     }
934     ret = TWDeleteTWSock(twSock);
935     if(ret != TW_RESULT_OK)
936     {
937         return ret;
938     }
939
940     return ret;
941 }