Merge branch 'master' into cloud-interface
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caretransmission.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 // Defining _BSD_SOURCE or _DEFAULT_SOURCE causes header files to expose
22 // definitions that may otherwise be skipped. Skipping can cause implicit
23 // declaration warnings and/or bugs and subtle problems in code execution.
24 // For glibc information on feature test macros,
25 // Refer http://www.gnu.org/software/libc/manual/html_node/Feature-Test-Macros.html
26 //
27 // This file requires #define use due to random()
28 // For details on compatibility and glibc support,
29 // Refer http://www.gnu.org/software/libc/manual/html_node/BSD-Random.html
30 #define _DEFAULT_SOURCE
31
32 // Defining _POSIX_C_SOURCE macro with 199309L (or greater) as value
33 // causes header files to expose definitions
34 // corresponding to the POSIX.1b, Real-time extensions
35 // (IEEE Std 1003.1b-1993) specification
36 //
37 // For this specific file, see use of clock_gettime,
38 // Refer to http://pubs.opengroup.org/stage7tc1/functions/clock_gettime.html
39 // and to http://man7.org/linux/man-pages/man2/clock_gettime.2.html
40 #ifndef _POSIX_C_SOURCE
41 #define _POSIX_C_SOURCE 200809L
42 #endif
43
44 #include <stdio.h>
45 #include <stdlib.h>
46 #include <string.h>
47
48 #ifndef SINGLE_THREAD
49 #include <unistd.h>
50 #include <time.h>
51 #include <sys/time.h>
52 #endif
53
54 #if defined(__ANDROID__)
55 #include <linux/time.h>
56 #endif
57
58 #include "caretransmission.h"
59 #include "caremotehandler.h"
60 #include "caprotocolmessage.h"
61 #include "oic_malloc.h"
62 #include "logger.h"
63
64 #define TAG "OIC_CA_RETRANS"
65
66 typedef struct
67 {
68     uint64_t timeStamp;                 /**< last sent time. microseconds */
69 #ifndef SINGLE_THREAD
70     uint64_t timeout;                   /**< timeout value. microseconds */
71 #endif
72     uint8_t triedCount;                 /**< retransmission count */
73     uint16_t messageId;                 /**< coap PDU message id */
74     CAEndpoint_t *endpoint;             /**< remote endpoint */
75     void *pdu;                          /**< coap PDU */
76     uint32_t size;                      /**< coap PDU size */
77 } CARetransmissionData_t;
78
79 static const uint64_t USECS_PER_SEC = 1000000;
80
81 /**
82  * @brief   getCurrent monotonic time
83  * @return  current time in microseconds
84  */
85 uint64_t getCurrentTimeInMicroSeconds();
86
87 #ifndef SINGLE_THREAD
88 /**
89  * @brief   timeout value is
90  *          between DEFAULT_ACK_TIMEOUT_SEC and
91  *          (DEFAULT_ACK_TIMEOUT_SEC * DEFAULT_RANDOM_FACTOR) second.
92  *          DEFAULT_RANDOM_FACTOR       1.5 (CoAP)
93  * @return  microseconds.
94  */
95 static uint64_t CAGetTimeoutValue()
96 {
97     return ((DEFAULT_ACK_TIMEOUT_SEC * 1000) + ((1000 * (random() & 0xFF)) >> 8)) *
98             (uint64_t) 1000;
99 }
100
101 CAResult_t CARetransmissionStart(CARetransmission_t *context)
102 {
103     if (NULL == context)
104     {
105         OIC_LOG(ERROR, TAG, "context is empty");
106         return CA_STATUS_INVALID_PARAM;
107     }
108
109     if (NULL == context->threadPool)
110     {
111         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
112         return CA_STATUS_INVALID_PARAM;
113     }
114
115     CAResult_t res = ca_thread_pool_add_task(context->threadPool, CARetransmissionBaseRoutine,
116                                              context);
117
118     if (CA_STATUS_OK != res)
119     {
120         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
121         return res;
122     }
123
124     return res;
125 }
126 #endif
127
128 /**
129  * @brief   check timeout routine
130  * @param   currentTime     [IN]microseconds
131  * @param   retData         [IN]retransmission data
132  * @return  true if the timeout period has elapsed, false otherwise
133  */
134 static bool CACheckTimeout(uint64_t currentTime, CARetransmissionData_t *retData)
135 {
136 #ifndef SINGLE_THREAD
137     // #1. calculate timeout
138     uint32_t milliTimeoutValue = retData->timeout * 0.001;
139     uint64_t timeout = (milliTimeoutValue << retData->triedCount) * (uint64_t) 1000;
140
141     if (currentTime >= retData->timeStamp + timeout)
142     {
143         OIC_LOG_V(DEBUG, TAG, "%llu microseconds time out!!, tried count(%d)",
144                   timeout, retData->triedCount);
145         return true;
146     }
147 #else
148     // #1. calculate timeout
149     uint64_t timeOut = (2 << retData->triedCount) * 1000000;
150
151     if (currentTime >= retData->timeStamp + timeOut)
152     {
153         OIC_LOG_V(DEBUG, TAG, "timeout=%d, tried cnt=%d",
154                   (2 << retData->triedCount), retData->triedCount);
155         return true;
156     }
157 #endif
158     return false;
159 }
160
161 static void CACheckRetransmissionList(CARetransmission_t *context)
162 {
163     if (NULL == context)
164     {
165         OIC_LOG(ERROR, TAG, "context is null");
166         return;
167     }
168
169     // mutex lock
170     ca_mutex_lock(context->threadMutex);
171
172     uint32_t i = 0;
173     uint32_t len = u_arraylist_length(context->dataList);
174
175     for (i = 0; i < len; i++)
176     {
177         CARetransmissionData_t *retData = u_arraylist_get(context->dataList, i);
178
179         if (NULL == retData)
180         {
181             continue;
182         }
183
184         uint64_t currentTime = getCurrentTimeInMicroSeconds();
185
186         if (CACheckTimeout(currentTime, retData))
187         {
188             // #2. if time's up, send the data.
189             if (NULL != context->dataSendMethod)
190             {
191                 OIC_LOG_V(DEBUG, TAG, "retransmission CON data!!, msgid=%d",
192                           retData->messageId);
193                 context->dataSendMethod(retData->endpoint, retData->pdu, retData->size);
194             }
195
196             // #3. increase the retransmission count and update timestamp.
197             retData->timeStamp = currentTime;
198             retData->triedCount++;
199         }
200
201         // #4. if tried count is max, remove the retransmission data from list.
202         if (retData->triedCount >= context->config.tryingCount)
203         {
204             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
205             if (NULL == removedData)
206             {
207                 OIC_LOG(ERROR, TAG, "Removed data is NULL");
208                 // mutex unlock
209                 ca_mutex_unlock(context->threadMutex);
210                 return;
211             }
212             OIC_LOG_V(DEBUG, TAG, "max trying count, remove RTCON data,"
213                       "msgid=%d", removedData->messageId);
214
215             // callback for retransmit timeout
216             if (NULL != context->timeoutCallback)
217             {
218                 context->timeoutCallback(removedData->endpoint, removedData->pdu,
219                                          removedData->size);
220             }
221
222             CAFreeEndpoint(removedData->endpoint);
223             OICFree(removedData->pdu);
224
225             OICFree(removedData);
226
227             // modify loop value.
228             len = u_arraylist_length(context->dataList);
229             --i;
230         }
231     }
232
233     // mutex unlock
234     ca_mutex_unlock(context->threadMutex);
235 }
236
237 void CARetransmissionBaseRoutine(void *threadValue)
238 {
239     OIC_LOG(DEBUG, TAG, "retransmission main thread start");
240
241     CARetransmission_t *context = (CARetransmission_t *) threadValue;
242
243     if (NULL == context)
244     {
245         OIC_LOG(ERROR, TAG, "thread data passing error");
246
247         return;
248     }
249
250 #ifdef SINGLE_THREAD
251     if (true == context->isStop)
252     {
253         OIC_LOG(DEBUG, TAG, "thread stopped");
254         return;
255     }
256     CACheckRetransmissionList(context);
257 #else
258
259     while (!context->isStop)
260     {
261         // mutex lock
262         ca_mutex_lock(context->threadMutex);
263
264         if (!context->isStop && u_arraylist_length(context->dataList) <= 0)
265         {
266             // if list is empty, thread will wait
267             OIC_LOG(DEBUG, TAG, "wait..there is no retransmission data.");
268
269             // wait
270             ca_cond_wait(context->threadCond, context->threadMutex);
271
272             OIC_LOG(DEBUG, TAG, "wake up..");
273         }
274         else if (!context->isStop)
275         {
276             // check each RETRANSMISSION_CHECK_PERIOD_SEC time.
277             OIC_LOG_V(DEBUG, TAG, "wait..(%lld)microseconds",
278                       RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC);
279
280             // wait
281             uint64_t absTime = RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC;
282             ca_cond_wait_for(context->threadCond, context->threadMutex, absTime );
283         }
284         else
285         {
286             // we are stopping, so we want to unlock and finish stopping
287         }
288
289         // mutex unlock
290         ca_mutex_unlock(context->threadMutex);
291
292         // check stop flag
293         if (context->isStop)
294         {
295             continue;
296         }
297
298         CACheckRetransmissionList(context);
299     }
300
301     ca_mutex_lock(context->threadMutex);
302     ca_cond_signal(context->threadCond);
303     ca_mutex_unlock(context->threadMutex);
304
305 #endif
306     OIC_LOG(DEBUG, TAG, "retransmission main thread end");
307
308 }
309
310 CAResult_t CARetransmissionInitialize(CARetransmission_t *context,
311                                       ca_thread_pool_t handle,
312                                       CADataSendMethod_t retransmissionSendMethod,
313                                       CATimeoutCallback_t timeoutCallback,
314                                       CARetransmissionConfig_t* config)
315 {
316     if (NULL == context)
317     {
318         OIC_LOG(ERROR, TAG, "thread instance is empty");
319         return CA_STATUS_INVALID_PARAM;
320     }
321 #ifndef SINGLE_THREAD
322     if (NULL == handle)
323     {
324         OIC_LOG(ERROR, TAG, "thread pool handle is empty");
325         return CA_STATUS_INVALID_PARAM;
326     }
327 #endif
328     OIC_LOG(DEBUG, TAG, "thread initialize");
329
330     memset(context, 0, sizeof(CARetransmission_t));
331
332     CARetransmissionConfig_t cfg = { .supportType = DEFAULT_RETRANSMISSION_TYPE,
333                                      .tryingCount = DEFAULT_RETRANSMISSION_COUNT };
334
335     if (config)
336     {
337         cfg = *config;
338     }
339
340     // set send thread data
341     context->threadPool = handle;
342     context->threadMutex = ca_mutex_new();
343     context->threadCond = ca_cond_new();
344     context->dataSendMethod = retransmissionSendMethod;
345     context->timeoutCallback = timeoutCallback;
346     context->config = cfg;
347     context->isStop = false;
348     context->dataList = u_arraylist_create();
349
350     return CA_STATUS_OK;
351 }
352
353 CAResult_t CARetransmissionSentData(CARetransmission_t *context,
354                                     const CAEndpoint_t *endpoint,
355                                     const void *pdu, uint32_t size)
356 {
357     if (NULL == context || NULL == endpoint || NULL == pdu)
358     {
359         OIC_LOG(ERROR, TAG, "invalid parameter");
360         return CA_STATUS_INVALID_PARAM;
361     }
362
363     // #0. check support transport type
364     if (!(context->config.supportType & endpoint->adapter))
365     {
366         OIC_LOG_V(DEBUG, TAG, "not supported transport type=%d", endpoint->adapter);
367         return CA_NOT_SUPPORTED;
368     }
369
370     // #1. check PDU method type and get message id.
371     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
372     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
373
374     OIC_LOG_V(DEBUG, TAG, "sent pdu, msgtype=%d, msgid=%d", type, messageId);
375
376     if (CA_MSG_CONFIRM != type)
377     {
378         OIC_LOG(DEBUG, TAG, "not supported message type");
379         return CA_NOT_SUPPORTED;
380     }
381
382     // create retransmission data
383     CARetransmissionData_t *retData = (CARetransmissionData_t *) OICCalloc(
384                                           1, sizeof(CARetransmissionData_t));
385
386     if (NULL == retData)
387     {
388         OIC_LOG(ERROR, TAG, "memory error");
389         return CA_MEMORY_ALLOC_FAILED;
390     }
391
392     // copy PDU data
393     void *pduData = (void *) OICMalloc(size);
394     if (NULL == pduData)
395     {
396         OICFree(retData);
397         OIC_LOG(ERROR, TAG, "memory error");
398         return CA_MEMORY_ALLOC_FAILED;
399     }
400     memcpy(pduData, pdu, size);
401
402     // clone remote endpoint
403     CAEndpoint_t *remoteEndpoint = CACloneEndpoint(endpoint);
404     if (NULL == remoteEndpoint)
405     {
406         OICFree(retData);
407         OICFree(pduData);
408         OIC_LOG(ERROR, TAG, "memory error");
409         return CA_MEMORY_ALLOC_FAILED;
410     }
411
412     // #2. add additional information. (time stamp, retransmission count...)
413     retData->timeStamp = getCurrentTimeInMicroSeconds();
414 #ifndef SINGLE_THREAD
415     retData->timeout = CAGetTimeoutValue();
416 #endif
417     retData->triedCount = 0;
418     retData->messageId = messageId;
419     retData->endpoint = remoteEndpoint;
420     retData->pdu = pduData;
421     retData->size = size;
422 #ifndef SINGLE_THREAD
423     // mutex lock
424     ca_mutex_lock(context->threadMutex);
425
426     uint32_t i = 0;
427     uint32_t len = u_arraylist_length(context->dataList);
428
429     // #3. add data into list
430     for (i = 0; i < len; i++)
431     {
432         CARetransmissionData_t *currData = u_arraylist_get(context->dataList, i);
433
434         if (NULL == currData)
435         {
436             continue;
437         }
438
439         // found index
440         if (NULL != currData->endpoint && currData->messageId == messageId
441             && (currData->endpoint->adapter == endpoint->adapter))
442         {
443             OIC_LOG(ERROR, TAG, "Duplicate message ID");
444
445             // mutex unlock
446             ca_mutex_unlock(context->threadMutex);
447
448             OICFree(retData);
449             OICFree(pduData);
450             OICFree(remoteEndpoint);
451             return CA_STATUS_FAILED;
452         }
453     }
454
455     u_arraylist_add(context->dataList, (void *) retData);
456
457     // notify the thread
458     ca_cond_signal(context->threadCond);
459
460     // mutex unlock
461     ca_mutex_unlock(context->threadMutex);
462
463 #else
464     u_arraylist_add(context->dataList, (void *) retData);
465
466     CACheckRetransmissionList(context);
467 #endif
468     return CA_STATUS_OK;
469 }
470
471 CAResult_t CARetransmissionReceivedData(CARetransmission_t *context,
472                                         const CAEndpoint_t *endpoint, const void *pdu,
473                                         uint32_t size, void **retransmissionPdu)
474 {
475     OIC_LOG(DEBUG, TAG, "IN");
476     if (NULL == context || NULL == endpoint || NULL == pdu || NULL == retransmissionPdu)
477     {
478         OIC_LOG(ERROR, TAG, "invalid parameter");
479         return CA_STATUS_INVALID_PARAM;
480     }
481
482     // #0. check support transport type
483     if (!(context->config.supportType & endpoint->adapter))
484     {
485         OIC_LOG_V(DEBUG, TAG, "not supported transport type=%d", endpoint->adapter);
486         return CA_STATUS_OK;
487     }
488
489     // #1. check PDU method type and get message id.
490     // ACK, RST --> remove the CON data
491     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
492     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
493     CAResponseResult_t code = CAGetCodeFromPduBinaryData(pdu, size);
494
495     OIC_LOG_V(DEBUG, TAG, "received pdu, msgtype=%d, msgid=%d, code=%d",
496               type, messageId, code);
497
498     if (((CA_MSG_ACKNOWLEDGE != type) && (CA_MSG_RESET != type))
499         || (CA_MSG_RESET == type && CA_EMPTY != code))
500     {
501         return CA_STATUS_OK;
502     }
503
504     // mutex lock
505     ca_mutex_lock(context->threadMutex);
506     uint32_t len = u_arraylist_length(context->dataList);
507
508     // find index
509     uint32_t i;
510     for (i = 0; i < len; i++)
511     {
512         CARetransmissionData_t *retData = (CARetransmissionData_t *) u_arraylist_get(
513                 context->dataList, i);
514
515         if (NULL == retData)
516         {
517             continue;
518         }
519
520         // found index
521         if (NULL != retData->endpoint && retData->messageId == messageId
522             && (retData->endpoint->adapter == endpoint->adapter))
523         {
524             // get pdu data for getting token when CA_EMPTY(RST/ACK) is received from remote device
525             // if retransmission was finish..token will be unavailable.
526             if (CA_EMPTY == CAGetCodeFromPduBinaryData(pdu, size))
527             {
528                 OIC_LOG(DEBUG, TAG, "code is CA_EMPTY");
529
530                 if (NULL == retData->pdu)
531                 {
532                     OIC_LOG(ERROR, TAG, "retData->pdu is null");
533                     OICFree(retData);
534                     // mutex unlock
535                     ca_mutex_unlock(context->threadMutex);
536
537                     return CA_STATUS_FAILED;
538                 }
539
540                 // copy PDU data
541                 (*retransmissionPdu) = (void *) OICCalloc(1, retData->size);
542                 if ((*retransmissionPdu) == NULL)
543                 {
544                     OICFree(retData);
545                     OIC_LOG(ERROR, TAG, "memory error");
546
547                     // mutex unlock
548                     ca_mutex_unlock(context->threadMutex);
549
550                     return CA_MEMORY_ALLOC_FAILED;
551                 }
552                 memcpy((*retransmissionPdu), retData->pdu, retData->size);
553             }
554
555             // #2. remove data from list
556             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
557             if (NULL == removedData)
558             {
559                 OIC_LOG(ERROR, TAG, "Removed data is NULL");
560
561                 // mutex unlock
562                 ca_mutex_unlock(context->threadMutex);
563
564                 return CA_STATUS_FAILED;
565             }
566
567             OIC_LOG_V(DEBUG, TAG, "remove RTCON data!!, msgid=%d", messageId);
568
569             CAFreeEndpoint(removedData->endpoint);
570             OICFree(removedData->pdu);
571             OICFree(removedData);
572
573             break;
574         }
575     }
576
577     // mutex unlock
578     ca_mutex_unlock(context->threadMutex);
579
580     OIC_LOG(DEBUG, TAG, "OUT");
581     return CA_STATUS_OK;
582 }
583
584 CAResult_t CARetransmissionStop(CARetransmission_t *context)
585 {
586     if (NULL == context)
587     {
588         OIC_LOG(ERROR, TAG, "context is empty..");
589         return CA_STATUS_INVALID_PARAM;
590     }
591
592     OIC_LOG(DEBUG, TAG, "retransmission stop request!!");
593
594     // mutex lock
595     ca_mutex_lock(context->threadMutex);
596
597     // set stop flag
598     context->isStop = true;
599
600     // notify the thread
601     ca_cond_signal(context->threadCond);
602
603     ca_cond_wait(context->threadCond, context->threadMutex);
604
605     // mutex unlock
606     ca_mutex_unlock(context->threadMutex);
607
608     return CA_STATUS_OK;
609 }
610
611 CAResult_t CARetransmissionDestroy(CARetransmission_t *context)
612 {
613     if (NULL == context)
614     {
615         OIC_LOG(ERROR, TAG, "context is empty..");
616         return CA_STATUS_INVALID_PARAM;
617     }
618
619     OIC_LOG(DEBUG, TAG, "retransmission context destroy..");
620
621     ca_mutex_free(context->threadMutex);
622     context->threadMutex = NULL;
623     ca_cond_free(context->threadCond);
624     u_arraylist_free(&context->dataList);
625
626     return CA_STATUS_OK;
627 }
628
629 uint64_t getCurrentTimeInMicroSeconds()
630 {
631     OIC_LOG(DEBUG, TAG, "IN");
632     uint64_t currentTime = 0;
633
634 #ifdef __ANDROID__
635     struct timespec getTs;
636
637     clock_gettime(CLOCK_MONOTONIC, &getTs);
638
639     currentTime = (getTs.tv_sec * (uint64_t)1000000000 + getTs.tv_nsec)/1000;
640     OIC_LOG_V(DEBUG, TAG, "current time = %lld", currentTime);
641 #elif defined __ARDUINO__
642     currentTime = millis() * 1000;
643     OIC_LOG_V(DEBUG, TAG, "currtime=%lu", currentTime);
644 #else
645 #if _POSIX_TIMERS > 0
646     struct timespec ts;
647     clock_gettime(CLOCK_MONOTONIC, &ts);
648     currentTime = ts.tv_sec * USECS_PER_SEC + ts.tv_nsec / 1000;
649 #else
650     struct timeval tv;
651     gettimeofday(&tv, NULL);
652     currentTime = tv.tv_sec * USECS_PER_SEC + tv.tv_usec;
653 #endif
654 #endif
655
656     OIC_LOG(DEBUG, TAG, "OUT");
657     return currentTime;
658 }