Merge branch 'master' into windows-port
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caretransmission.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 // Defining _BSD_SOURCE or _DEFAULT_SOURCE causes header files to expose
22 // definitions that may otherwise be skipped. Skipping can cause implicit
23 // declaration warnings and/or bugs and subtle problems in code execution.
24 // For glibc information on feature test macros,
25 // Refer http://www.gnu.org/software/libc/manual/html_node/Feature-Test-Macros.html
26 //
27 // This file requires #define use due to random()
28 // For details on compatibility and glibc support,
29 // Refer http://www.gnu.org/software/libc/manual/html_node/BSD-Random.html
30 #define _DEFAULT_SOURCE
31
32 // Defining _POSIX_C_SOURCE macro with 199309L (or greater) as value
33 // causes header files to expose definitions
34 // corresponding to the POSIX.1b, Real-time extensions
35 // (IEEE Std 1003.1b-1993) specification
36 //
37 // For this specific file, see use of clock_gettime,
38 // Refer to http://pubs.opengroup.org/stage7tc1/functions/clock_gettime.html
39 // and to http://man7.org/linux/man-pages/man2/clock_gettime.2.html
40 #ifndef _POSIX_C_SOURCE
41 #define _POSIX_C_SOURCE 200809L
42 #endif
43
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <string.h>
47
48 #ifndef SINGLE_THREAD
49 #ifdef HAVE_UNISTD_H
50 #include <unistd.h>
51 #endif
52 #ifdef HAVE_SYS_TIME_H
53 #include <sys/time.h>
54 #endif
55 #if HAVE_SYS_TIMEB_H
56 #include <sys/timeb.h>
57 #endif
58 #ifdef HAVE_TIME_H
59 #include <time.h>
60 #endif
61 #endif
62
63 #if defined(__ANDROID__)
64 #include <linux/time.h>
65 #endif
66
67 #include "caretransmission.h"
68 #include "caremotehandler.h"
69 #include "caprotocolmessage.h"
70 #include "oic_malloc.h"
71 #include "logger.h"
72
73 #define TAG "OIC_CA_RETRANS"
74
75 typedef struct
76 {
77     uint64_t timeStamp;                 /**< last sent time. microseconds */
78 #ifndef SINGLE_THREAD
79     uint64_t timeout;                   /**< timeout value. microseconds */
80 #endif
81     uint8_t triedCount;                 /**< retransmission count */
82     uint16_t messageId;                 /**< coap PDU message id */
83     CAEndpoint_t *endpoint;             /**< remote endpoint */
84     void *pdu;                          /**< coap PDU */
85     uint32_t size;                      /**< coap PDU size */
86 } CARetransmissionData_t;
87
88 static const uint64_t USECS_PER_SEC = 1000000;
89 static const uint64_t MSECS_PER_SEC = 1000;
90
91 /**
92  * @brief   getCurrent monotonic time
93  * @return  current time in microseconds
94  */
95 uint64_t getCurrentTimeInMicroSeconds();
96
97 #ifndef SINGLE_THREAD
98 /**
99  * @brief   timeout value is
100  *          between DEFAULT_ACK_TIMEOUT_SEC and
101  *          (DEFAULT_ACK_TIMEOUT_SEC * DEFAULT_RANDOM_FACTOR) second.
102  *          DEFAULT_RANDOM_FACTOR       1.5 (CoAP)
103  * @return  microseconds.
104  */
105 static uint64_t CAGetTimeoutValue()
106 {
107 #if !defined(_WIN32)
108     return ((DEFAULT_ACK_TIMEOUT_SEC * 1000) + ((1000 * (random() & 0xFF)) >> 8)) *
109             (uint64_t) 1000;
110 #else
111     return ((DEFAULT_ACK_TIMEOUT_SEC * 1000) + ((1000 * (rand() & 0xFF)) >> 8)) *
112             (uint64_t) 1000;
113 #endif
114 }
115
116 CAResult_t CARetransmissionStart(CARetransmission_t *context)
117 {
118     if (NULL == context)
119     {
120         OIC_LOG(ERROR, TAG, "context is empty");
121         return CA_STATUS_INVALID_PARAM;
122     }
123
124     if (NULL == context->threadPool)
125     {
126         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
127         return CA_STATUS_INVALID_PARAM;
128     }
129
130     CAResult_t res = ca_thread_pool_add_task(context->threadPool, CARetransmissionBaseRoutine,
131                                              context);
132
133     if (CA_STATUS_OK != res)
134     {
135         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
136         return res;
137     }
138
139     return res;
140 }
141 #endif
142
143 /**
144  * @brief   check timeout routine
145  * @param   currentTime     [IN]microseconds
146  * @param   retData         [IN]retransmission data
147  * @return  true if the timeout period has elapsed, false otherwise
148  */
149 static bool CACheckTimeout(uint64_t currentTime, CARetransmissionData_t *retData)
150 {
151 #ifndef SINGLE_THREAD
152     // #1. calculate timeout
153     uint32_t milliTimeoutValue = retData->timeout * 0.001;
154     uint64_t timeout = (milliTimeoutValue << retData->triedCount) * (uint64_t) 1000;
155
156     if (currentTime >= retData->timeStamp + timeout)
157     {
158         OIC_LOG_V(DEBUG, TAG, "%llu microseconds time out!!, tried count(%d)",
159                   timeout, retData->triedCount);
160         return true;
161     }
162 #else
163     // #1. calculate timeout
164     uint64_t timeOut = (2 << retData->triedCount) * 1000000;
165
166     if (currentTime >= retData->timeStamp + timeOut)
167     {
168         OIC_LOG_V(DEBUG, TAG, "timeout=%d, tried cnt=%d",
169                   (2 << retData->triedCount), retData->triedCount);
170         return true;
171     }
172 #endif
173     return false;
174 }
175
176 static void CACheckRetransmissionList(CARetransmission_t *context)
177 {
178     if (NULL == context)
179     {
180         OIC_LOG(ERROR, TAG, "context is null");
181         return;
182     }
183
184     // mutex lock
185     ca_mutex_lock(context->threadMutex);
186
187     uint32_t i = 0;
188     uint32_t len = u_arraylist_length(context->dataList);
189
190     for (i = 0; i < len; i++)
191     {
192         CARetransmissionData_t *retData = u_arraylist_get(context->dataList, i);
193
194         if (NULL == retData)
195         {
196             continue;
197         }
198
199         uint64_t currentTime = getCurrentTimeInMicroSeconds();
200
201         if (CACheckTimeout(currentTime, retData))
202         {
203             // #2. if time's up, send the data.
204             if (NULL != context->dataSendMethod)
205             {
206                 OIC_LOG_V(DEBUG, TAG, "retransmission CON data!!, msgid=%d",
207                           retData->messageId);
208                 context->dataSendMethod(retData->endpoint, retData->pdu, retData->size);
209             }
210
211             // #3. increase the retransmission count and update timestamp.
212             retData->timeStamp = currentTime;
213             retData->triedCount++;
214         }
215
216         // #4. if tried count is max, remove the retransmission data from list.
217         if (retData->triedCount >= context->config.tryingCount)
218         {
219             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
220             if (NULL == removedData)
221             {
222                 OIC_LOG(ERROR, TAG, "Removed data is NULL");
223                 // mutex unlock
224                 ca_mutex_unlock(context->threadMutex);
225                 return;
226             }
227             OIC_LOG_V(DEBUG, TAG, "max trying count, remove RTCON data,"
228                       "msgid=%d", removedData->messageId);
229
230             // callback for retransmit timeout
231             if (NULL != context->timeoutCallback)
232             {
233                 context->timeoutCallback(removedData->endpoint, removedData->pdu,
234                                          removedData->size);
235             }
236
237             CAFreeEndpoint(removedData->endpoint);
238             OICFree(removedData->pdu);
239
240             OICFree(removedData);
241
242             // modify loop value.
243             len = u_arraylist_length(context->dataList);
244             --i;
245         }
246     }
247
248     // mutex unlock
249     ca_mutex_unlock(context->threadMutex);
250 }
251
252 void CARetransmissionBaseRoutine(void *threadValue)
253 {
254     OIC_LOG(DEBUG, TAG, "retransmission main thread start");
255
256     CARetransmission_t *context = (CARetransmission_t *) threadValue;
257
258     if (NULL == context)
259     {
260         OIC_LOG(ERROR, TAG, "thread data passing error");
261
262         return;
263     }
264
265 #ifdef SINGLE_THREAD
266     if (true == context->isStop)
267     {
268         OIC_LOG(DEBUG, TAG, "thread stopped");
269         return;
270     }
271     CACheckRetransmissionList(context);
272 #else
273
274     while (!context->isStop)
275     {
276         // mutex lock
277         ca_mutex_lock(context->threadMutex);
278
279         if (!context->isStop && u_arraylist_length(context->dataList) <= 0)
280         {
281             // if list is empty, thread will wait
282             OIC_LOG(DEBUG, TAG, "wait..there is no retransmission data.");
283
284             // wait
285             ca_cond_wait(context->threadCond, context->threadMutex);
286
287             OIC_LOG(DEBUG, TAG, "wake up..");
288         }
289         else if (!context->isStop)
290         {
291             // check each RETRANSMISSION_CHECK_PERIOD_SEC time.
292             OIC_LOG_V(DEBUG, TAG, "wait..(%lld)microseconds",
293                       RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC);
294
295             // wait
296             uint64_t absTime = RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC;
297             ca_cond_wait_for(context->threadCond, context->threadMutex, absTime );
298         }
299         else
300         {
301             // we are stopping, so we want to unlock and finish stopping
302         }
303
304         // mutex unlock
305         ca_mutex_unlock(context->threadMutex);
306
307         // check stop flag
308         if (context->isStop)
309         {
310             continue;
311         }
312
313         CACheckRetransmissionList(context);
314     }
315
316     ca_mutex_lock(context->threadMutex);
317     ca_cond_signal(context->threadCond);
318     ca_mutex_unlock(context->threadMutex);
319
320 #endif
321     OIC_LOG(DEBUG, TAG, "retransmission main thread end");
322
323 }
324
325 CAResult_t CARetransmissionInitialize(CARetransmission_t *context,
326                                       ca_thread_pool_t handle,
327                                       CADataSendMethod_t retransmissionSendMethod,
328                                       CATimeoutCallback_t timeoutCallback,
329                                       CARetransmissionConfig_t* config)
330 {
331     if (NULL == context)
332     {
333         OIC_LOG(ERROR, TAG, "thread instance is empty");
334         return CA_STATUS_INVALID_PARAM;
335     }
336 #ifndef SINGLE_THREAD
337     if (NULL == handle)
338     {
339         OIC_LOG(ERROR, TAG, "thread pool handle is empty");
340         return CA_STATUS_INVALID_PARAM;
341     }
342 #endif
343     OIC_LOG(DEBUG, TAG, "thread initialize");
344
345     memset(context, 0, sizeof(CARetransmission_t));
346
347     CARetransmissionConfig_t cfg = { .supportType = DEFAULT_RETRANSMISSION_TYPE,
348                                      .tryingCount = DEFAULT_RETRANSMISSION_COUNT };
349
350     if (config)
351     {
352         cfg = *config;
353     }
354
355     // set send thread data
356     context->threadPool = handle;
357     context->threadMutex = ca_mutex_new();
358     context->threadCond = ca_cond_new();
359     context->dataSendMethod = retransmissionSendMethod;
360     context->timeoutCallback = timeoutCallback;
361     context->config = cfg;
362     context->isStop = false;
363     context->dataList = u_arraylist_create();
364
365     return CA_STATUS_OK;
366 }
367
368 CAResult_t CARetransmissionSentData(CARetransmission_t *context,
369                                     const CAEndpoint_t *endpoint,
370                                     const void *pdu, uint32_t size)
371 {
372     if (NULL == context || NULL == endpoint || NULL == pdu)
373     {
374         OIC_LOG(ERROR, TAG, "invalid parameter");
375         return CA_STATUS_INVALID_PARAM;
376     }
377
378     // #0. check support transport type
379     if (!(context->config.supportType & endpoint->adapter))
380     {
381         OIC_LOG_V(DEBUG, TAG, "not supported transport type=%d", endpoint->adapter);
382         return CA_NOT_SUPPORTED;
383     }
384
385     // #1. check PDU method type and get message id.
386     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
387     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
388
389     OIC_LOG_V(DEBUG, TAG, "sent pdu, msgtype=%d, msgid=%d", type, messageId);
390
391     if (CA_MSG_CONFIRM != type)
392     {
393         OIC_LOG(DEBUG, TAG, "not supported message type");
394         return CA_NOT_SUPPORTED;
395     }
396
397     // create retransmission data
398     CARetransmissionData_t *retData = (CARetransmissionData_t *) OICCalloc(
399                                           1, sizeof(CARetransmissionData_t));
400
401     if (NULL == retData)
402     {
403         OIC_LOG(ERROR, TAG, "memory error");
404         return CA_MEMORY_ALLOC_FAILED;
405     }
406
407     // copy PDU data
408     void *pduData = (void *) OICMalloc(size);
409     if (NULL == pduData)
410     {
411         OICFree(retData);
412         OIC_LOG(ERROR, TAG, "memory error");
413         return CA_MEMORY_ALLOC_FAILED;
414     }
415     memcpy(pduData, pdu, size);
416
417     // clone remote endpoint
418     CAEndpoint_t *remoteEndpoint = CACloneEndpoint(endpoint);
419     if (NULL == remoteEndpoint)
420     {
421         OICFree(retData);
422         OICFree(pduData);
423         OIC_LOG(ERROR, TAG, "memory error");
424         return CA_MEMORY_ALLOC_FAILED;
425     }
426
427     // #2. add additional information. (time stamp, retransmission count...)
428     retData->timeStamp = getCurrentTimeInMicroSeconds();
429 #ifndef SINGLE_THREAD
430     retData->timeout = CAGetTimeoutValue();
431 #endif
432     retData->triedCount = 0;
433     retData->messageId = messageId;
434     retData->endpoint = remoteEndpoint;
435     retData->pdu = pduData;
436     retData->size = size;
437 #ifndef SINGLE_THREAD
438     // mutex lock
439     ca_mutex_lock(context->threadMutex);
440
441     uint32_t i = 0;
442     uint32_t len = u_arraylist_length(context->dataList);
443
444     // #3. add data into list
445     for (i = 0; i < len; i++)
446     {
447         CARetransmissionData_t *currData = u_arraylist_get(context->dataList, i);
448
449         if (NULL == currData)
450         {
451             continue;
452         }
453
454         // found index
455         if (NULL != currData->endpoint && currData->messageId == messageId
456             && (currData->endpoint->adapter == endpoint->adapter))
457         {
458             OIC_LOG(ERROR, TAG, "Duplicate message ID");
459
460             // mutex unlock
461             ca_mutex_unlock(context->threadMutex);
462
463             OICFree(retData);
464             OICFree(pduData);
465             OICFree(remoteEndpoint);
466             return CA_STATUS_FAILED;
467         }
468     }
469
470     u_arraylist_add(context->dataList, (void *) retData);
471
472     // notify the thread
473     ca_cond_signal(context->threadCond);
474
475     // mutex unlock
476     ca_mutex_unlock(context->threadMutex);
477
478 #else
479     u_arraylist_add(context->dataList, (void *) retData);
480
481     CACheckRetransmissionList(context);
482 #endif
483     return CA_STATUS_OK;
484 }
485
486 CAResult_t CARetransmissionReceivedData(CARetransmission_t *context,
487                                         const CAEndpoint_t *endpoint, const void *pdu,
488                                         uint32_t size, void **retransmissionPdu)
489 {
490     OIC_LOG(DEBUG, TAG, "IN");
491     if (NULL == context || NULL == endpoint || NULL == pdu || NULL == retransmissionPdu)
492     {
493         OIC_LOG(ERROR, TAG, "invalid parameter");
494         return CA_STATUS_INVALID_PARAM;
495     }
496
497     // #0. check support transport type
498     if (!(context->config.supportType & endpoint->adapter))
499     {
500         OIC_LOG_V(DEBUG, TAG, "not supported transport type=%d", endpoint->adapter);
501         return CA_STATUS_OK;
502     }
503
504     // #1. check PDU method type and get message id.
505     // ACK, RST --> remove the CON data
506     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
507     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
508     CAResponseResult_t code = CAGetCodeFromPduBinaryData(pdu, size);
509
510     OIC_LOG_V(DEBUG, TAG, "received pdu, msgtype=%d, msgid=%d, code=%d",
511               type, messageId, code);
512
513     if (((CA_MSG_ACKNOWLEDGE != type) && (CA_MSG_RESET != type))
514         || (CA_MSG_RESET == type && CA_EMPTY != code))
515     {
516         return CA_STATUS_OK;
517     }
518
519     // mutex lock
520     ca_mutex_lock(context->threadMutex);
521     uint32_t len = u_arraylist_length(context->dataList);
522
523     // find index
524     uint32_t i;
525     for (i = 0; i < len; i++)
526     {
527         CARetransmissionData_t *retData = (CARetransmissionData_t *) u_arraylist_get(
528                 context->dataList, i);
529
530         if (NULL == retData)
531         {
532             continue;
533         }
534
535         // found index
536         if (NULL != retData->endpoint && retData->messageId == messageId
537             && (retData->endpoint->adapter == endpoint->adapter))
538         {
539             // get pdu data for getting token when CA_EMPTY(RST/ACK) is received from remote device
540             // if retransmission was finish..token will be unavailable.
541             if (CA_EMPTY == CAGetCodeFromPduBinaryData(pdu, size))
542             {
543                 OIC_LOG(DEBUG, TAG, "code is CA_EMPTY");
544
545                 if (NULL == retData->pdu)
546                 {
547                     OIC_LOG(ERROR, TAG, "retData->pdu is null");
548                     OICFree(retData);
549                     // mutex unlock
550                     ca_mutex_unlock(context->threadMutex);
551
552                     return CA_STATUS_FAILED;
553                 }
554
555                 // copy PDU data
556                 (*retransmissionPdu) = (void *) OICCalloc(1, retData->size);
557                 if ((*retransmissionPdu) == NULL)
558                 {
559                     OICFree(retData);
560                     OIC_LOG(ERROR, TAG, "memory error");
561
562                     // mutex unlock
563                     ca_mutex_unlock(context->threadMutex);
564
565                     return CA_MEMORY_ALLOC_FAILED;
566                 }
567                 memcpy((*retransmissionPdu), retData->pdu, retData->size);
568             }
569
570             // #2. remove data from list
571             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
572             if (NULL == removedData)
573             {
574                 OIC_LOG(ERROR, TAG, "Removed data is NULL");
575
576                 // mutex unlock
577                 ca_mutex_unlock(context->threadMutex);
578
579                 return CA_STATUS_FAILED;
580             }
581
582             OIC_LOG_V(DEBUG, TAG, "remove RTCON data!!, msgid=%d", messageId);
583
584             CAFreeEndpoint(removedData->endpoint);
585             OICFree(removedData->pdu);
586             OICFree(removedData);
587
588             break;
589         }
590     }
591
592     // mutex unlock
593     ca_mutex_unlock(context->threadMutex);
594
595     OIC_LOG(DEBUG, TAG, "OUT");
596     return CA_STATUS_OK;
597 }
598
599 CAResult_t CARetransmissionStop(CARetransmission_t *context)
600 {
601     if (NULL == context)
602     {
603         OIC_LOG(ERROR, TAG, "context is empty..");
604         return CA_STATUS_INVALID_PARAM;
605     }
606
607     OIC_LOG(DEBUG, TAG, "retransmission stop request!!");
608
609     // mutex lock
610     ca_mutex_lock(context->threadMutex);
611
612     // set stop flag
613     context->isStop = true;
614
615     // notify the thread
616     ca_cond_signal(context->threadCond);
617
618     ca_cond_wait(context->threadCond, context->threadMutex);
619
620     // mutex unlock
621     ca_mutex_unlock(context->threadMutex);
622
623     return CA_STATUS_OK;
624 }
625
626 CAResult_t CARetransmissionDestroy(CARetransmission_t *context)
627 {
628     if (NULL == context)
629     {
630         OIC_LOG(ERROR, TAG, "context is empty..");
631         return CA_STATUS_INVALID_PARAM;
632     }
633
634     OIC_LOG(DEBUG, TAG, "retransmission context destroy..");
635
636     ca_mutex_free(context->threadMutex);
637     context->threadMutex = NULL;
638     ca_cond_free(context->threadCond);
639     u_arraylist_free(&context->dataList);
640
641     return CA_STATUS_OK;
642 }
643
644 uint64_t getCurrentTimeInMicroSeconds()
645 {
646     OIC_LOG(DEBUG, TAG, "IN");
647     uint64_t currentTime = 0;
648
649 #ifdef __ANDROID__
650     struct timespec getTs;
651
652     clock_gettime(CLOCK_MONOTONIC, &getTs);
653
654     currentTime = (getTs.tv_sec * (uint64_t)1000000000 + getTs.tv_nsec)/1000;
655     OIC_LOG_V(DEBUG, TAG, "current time = %lld", currentTime);
656 #elif defined __ARDUINO__
657     currentTime = millis() * 1000;
658     OIC_LOG_V(DEBUG, TAG, "currtime=%lu", currentTime);
659 #else
660 #if _POSIX_TIMERS > 0
661     struct timespec ts;
662     clock_gettime(CLOCK_MONOTONIC, &ts);
663     currentTime = ts.tv_sec * USECS_PER_SEC + ts.tv_nsec / 1000;
664 #elif defined(_WIN32)
665     struct __timeb64 tb;
666     _ftime64_s(&tb);
667     currentTime = tb.time * USECS_PER_SEC + tb.millitm * MSECS_PER_SEC;
668 #else
669     struct timeval tv;
670     gettimeofday(&tv, NULL);
671     currentTime = tv.tv_sec * USECS_PER_SEC + tv.tv_usec;
672 #endif
673 #endif
674
675     OIC_LOG(DEBUG, TAG, "OUT");
676     return currentTime;
677 }