Imported Upstream version 0.9.2
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caretransmission.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 // Defining _BSD_SOURCE or _DEFAULT_SOURCE causes header files to expose
22 // definitions that may otherwise be skipped. Skipping can cause implicit
23 // declaration warnings and/or bugs and subtle problems in code execution.
24 // For glibc information on feature test macros,
25 // Refer http://www.gnu.org/software/libc/manual/html_node/Feature-Test-Macros.html
26 //
27 // This file requires #define use due to random()
28 // For details on compatibility and glibc support,
29 // Refer http://www.gnu.org/software/libc/manual/html_node/BSD-Random.html
30 #define _DEFAULT_SOURCE
31 #define _BSD_SOURCE
32
33 // Defining _POSIX_C_SOURCE macro with 199309L (or greater) as value
34 // causes header files to expose definitions
35 // corresponding to the POSIX.1b, Real-time extensions
36 // (IEEE Std 1003.1b-1993) specification
37 //
38 // For this specific file, see use of clock_gettime,
39 // Refer to http://pubs.opengroup.org/stage7tc1/functions/clock_gettime.html
40 // and to http://man7.org/linux/man-pages/man2/clock_gettime.2.html
41 #ifndef _POSIX_C_SOURCE
42 #define _POSIX_C_SOURCE 200809L
43 #endif
44
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <string.h>
48
49 #ifndef SINGLE_THREAD
50 #include <unistd.h>
51 #include <time.h>
52 #include <sys/time.h>
53 #endif
54
55 #if defined(__ANDROID__)
56 #include <linux/time.h>
57 #endif
58
59 #include "caretransmission.h"
60 #include "caremotehandler.h"
61 #include "caprotocolmessage.h"
62 #include "oic_malloc.h"
63 #include "logger.h"
64
65 #define TAG "CA_RETRANS"
66
67 typedef struct
68 {
69     uint64_t timeStamp;                 /**< last sent time. microseconds */
70 #ifndef SINGLE_THREAD
71     uint64_t timeout;                   /**< timeout value. microseconds */
72 #endif
73     uint8_t triedCount;                 /**< retransmission count */
74     uint16_t messageId;                 /**< coap PDU message id */
75     CAEndpoint_t *endpoint;             /**< remote endpoint */
76     void *pdu;                          /**< coap PDU */
77     uint32_t size;                      /**< coap PDU size */
78 } CARetransmissionData_t;
79
80 static const uint64_t USECS_PER_SEC = 1000000;
81
82 /**
83  * @brief   getCurrent monotonic time
84  * @return  current time in microseconds
85  */
86 uint64_t getCurrentTimeInMicroSeconds();
87
88 #ifndef SINGLE_THREAD
89 /**
90  * @brief   timeout value is
91  *          between DEFAULT_ACK_TIMEOUT_SEC and
92  *          (DEFAULT_ACK_TIMEOUT_SEC * DEFAULT_RANDOM_FACTOR) second.
93  *          DEFAULT_RANDOM_FACTOR       1.5 (CoAP)
94  * @return  microseconds.
95  */
96 static uint64_t CAGetTimeoutValue()
97 {
98     return ((DEFAULT_ACK_TIMEOUT_SEC * 1000) + ((1000 * (random() & 0xFF)) >> 8)) *
99             (uint64_t) 1000;
100 }
101
102 CAResult_t CARetransmissionStart(CARetransmission_t *context)
103 {
104     if (NULL == context)
105     {
106         OIC_LOG(ERROR, TAG, "context is empty");
107         return CA_STATUS_INVALID_PARAM;
108     }
109
110     if (NULL == context->threadPool)
111     {
112         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
113         return CA_STATUS_INVALID_PARAM;
114     }
115
116     CAResult_t res = ca_thread_pool_add_task(context->threadPool, CARetransmissionBaseRoutine,
117                                              context);
118
119     if (CA_STATUS_OK != res)
120     {
121         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
122         return res;
123     }
124
125     return res;
126 }
127 #endif
128
129 /**
130  * @brief   check timeout routine
131  * @param   currentTime     [IN]microseconds
132  * @param   retData         [IN]retransmission data
133  * @return  true if the timeout period has elapsed, false otherwise
134  */
135 static bool CACheckTimeout(uint64_t currentTime, CARetransmissionData_t *retData)
136 {
137 #ifndef SINGLE_THREAD
138     // #1. calculate timeout
139     uint32_t milliTimeoutValue = retData->timeout * 0.001;
140     uint64_t timeout = (milliTimeoutValue << retData->triedCount) * (uint64_t) 1000;
141
142     if (currentTime >= retData->timeStamp + timeout)
143     {
144         OIC_LOG_V(DEBUG, TAG, "%d microseconds time out!!, tried count(%ld)",
145                   timeout, retData->triedCount);
146         return true;
147     }
148 #else
149     // #1. calculate timeout
150     uint64_t timeOut = (2 << retData->triedCount) * 1000000;
151
152     if (currentTime >= retData->timeStamp + timeOut)
153     {
154         OIC_LOG_V(DEBUG, TAG, "timeout=%d, tried cnt=%d",
155                   (2 << retData->triedCount), retData->triedCount);
156         return true;
157     }
158 #endif
159     return false;
160 }
161
162 static void CACheckRetransmissionList(CARetransmission_t *context)
163 {
164     if (NULL == context)
165     {
166         OIC_LOG(ERROR, TAG, "context is null");
167         return;
168     }
169
170     // mutex lock
171     ca_mutex_lock(context->threadMutex);
172
173     uint32_t i = 0;
174     uint32_t len = u_arraylist_length(context->dataList);
175
176     for (i = 0; i < len; i++)
177     {
178         CARetransmissionData_t *retData = u_arraylist_get(context->dataList, i);
179
180         if (NULL == retData)
181         {
182             continue;
183         }
184
185         uint64_t currentTime = getCurrentTimeInMicroSeconds();
186
187         if (CACheckTimeout(currentTime, retData))
188         {
189             // #2. if time's up, send the data.
190             if (NULL != context->dataSendMethod)
191             {
192                 OIC_LOG_V(DEBUG, TAG, "retransmission CON data!!, msgid=%d",
193                           retData->messageId);
194                 context->dataSendMethod(retData->endpoint, retData->pdu, retData->size);
195             }
196
197             // #3. increase the retransmission count and update timestamp.
198             retData->timeStamp = currentTime;
199             retData->triedCount++;
200         }
201
202         // #4. if tried count is max, remove the retransmission data from list.
203         if (retData->triedCount >= context->config.tryingCount)
204         {
205             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
206             if (NULL == removedData)
207             {
208                 OIC_LOG(ERROR, TAG, "Removed data is NULL");
209                 // mutex unlock
210                 ca_mutex_unlock(context->threadMutex);
211                 return;
212             }
213             OIC_LOG_V(DEBUG, TAG, "max trying count, remove RTCON data,"
214                       "msgid=%d", removedData->messageId);
215
216             // callback for retransmit timeout
217             if (NULL != context->timeoutCallback)
218             {
219                 context->timeoutCallback(removedData->endpoint, removedData->pdu,
220                                          removedData->size);
221             }
222
223             CAFreeEndpoint(removedData->endpoint);
224             OICFree(removedData->pdu);
225
226             OICFree(removedData);
227
228             // modify loop value.
229             len = u_arraylist_length(context->dataList);
230             --i;
231         }
232     }
233
234     // mutex unlock
235     ca_mutex_unlock(context->threadMutex);
236 }
237
238 void CARetransmissionBaseRoutine(void *threadValue)
239 {
240     OIC_LOG(DEBUG, TAG, "retransmission main thread start");
241
242     CARetransmission_t *context = (CARetransmission_t *) threadValue;
243
244     if (NULL == context)
245     {
246         OIC_LOG(ERROR, TAG, "thread data passing error");
247
248         return;
249     }
250
251 #ifdef SINGLE_THREAD
252     if (true == context->isStop)
253     {
254         OIC_LOG(DEBUG, TAG, "thread stopped");
255         return;
256     }
257     CACheckRetransmissionList(context);
258 #else
259
260     while (!context->isStop)
261     {
262         // mutex lock
263         ca_mutex_lock(context->threadMutex);
264
265         if (!context->isStop && u_arraylist_length(context->dataList) <= 0)
266         {
267             // if list is empty, thread will wait
268             OIC_LOG(DEBUG, TAG, "wait..there is no retransmission data.");
269
270             // wait
271             ca_cond_wait(context->threadCond, context->threadMutex);
272
273             OIC_LOG(DEBUG, TAG, "wake up..");
274         }
275         else if (!context->isStop)
276         {
277             // check each RETRANSMISSION_CHECK_PERIOD_SEC time.
278             OIC_LOG_V(DEBUG, TAG, "wait..(%ld)microseconds",
279                       RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC);
280
281             // wait
282             uint64_t absTime = RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC;
283             ca_cond_wait_for(context->threadCond, context->threadMutex, absTime );
284         }
285         else
286         {
287             // we are stopping, so we want to unlock and finish stopping
288         }
289
290         // mutex unlock
291         ca_mutex_unlock(context->threadMutex);
292
293         // check stop flag
294         if (context->isStop)
295         {
296             continue;
297         }
298
299         CACheckRetransmissionList(context);
300     }
301
302     ca_mutex_lock(context->threadMutex);
303     ca_cond_signal(context->threadCond);
304     ca_mutex_unlock(context->threadMutex);
305
306 #endif
307     OIC_LOG(DEBUG, TAG, "retransmission main thread end");
308
309 }
310
311 CAResult_t CARetransmissionInitialize(CARetransmission_t *context,
312                                       ca_thread_pool_t handle,
313                                       CADataSendMethod_t retransmissionSendMethod,
314                                       CATimeoutCallback_t timeoutCallback,
315                                       CARetransmissionConfig_t* config)
316 {
317     if (NULL == context)
318     {
319         OIC_LOG(ERROR, TAG, "thread instance is empty");
320         return CA_STATUS_INVALID_PARAM;
321     }
322 #ifndef SINGLE_THREAD
323     if (NULL == handle)
324     {
325         OIC_LOG(ERROR, TAG, "thread pool handle is empty");
326         return CA_STATUS_INVALID_PARAM;
327     }
328 #endif
329     OIC_LOG(DEBUG, TAG, "thread initialize");
330
331     memset(context, 0, sizeof(CARetransmission_t));
332
333     CARetransmissionConfig_t cfg = { 0 };
334
335     if (NULL == config)
336     {
337         // setDefault
338         cfg.supportType = DEFAULT_RETRANSMISSION_TYPE;
339         cfg.tryingCount = DEFAULT_RETRANSMISSION_COUNT;
340     }
341     else
342     {
343         cfg = *config;
344     }
345
346     // set send thread data
347     context->threadPool = handle;
348     context->threadMutex = ca_mutex_new();
349     context->threadCond = ca_cond_new();
350     context->dataSendMethod = retransmissionSendMethod;
351     context->timeoutCallback = timeoutCallback;
352     context->config = cfg;
353     context->isStop = false;
354     context->dataList = u_arraylist_create();
355
356     return CA_STATUS_OK;
357 }
358
359 CAResult_t CARetransmissionSentData(CARetransmission_t *context,
360                                     const CAEndpoint_t *endpoint,
361                                     const void *pdu, uint32_t size)
362 {
363     if (NULL == context || NULL == endpoint || NULL == pdu)
364     {
365         OIC_LOG(ERROR, TAG, "invalid parameter");
366         return CA_STATUS_INVALID_PARAM;
367     }
368
369     // #0. check support transport type
370     if (!(context->config.supportType & endpoint->adapter))
371     {
372         OIC_LOG_V(DEBUG, TAG, "not supported transport type=%d", endpoint->adapter);
373         return CA_NOT_SUPPORTED;
374     }
375
376     // #1. check PDU method type and get message id.
377     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
378     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
379
380     OIC_LOG_V(DEBUG, TAG, "sent pdu, msgtype=%d, msgid=%d", type, messageId);
381
382     if (CA_MSG_CONFIRM != type)
383     {
384         OIC_LOG(DEBUG, TAG, "not supported message type");
385         return CA_NOT_SUPPORTED;
386     }
387
388     // create retransmission data
389     CARetransmissionData_t *retData = (CARetransmissionData_t *) OICCalloc(
390                                           1, sizeof(CARetransmissionData_t));
391
392     if (NULL == retData)
393     {
394         OIC_LOG(ERROR, TAG, "memory error");
395         return CA_MEMORY_ALLOC_FAILED;
396     }
397
398     // copy PDU data
399     void *pduData = (void *) OICMalloc(size);
400     if (NULL == pduData)
401     {
402         OICFree(retData);
403         OIC_LOG(ERROR, TAG, "memory error");
404         return CA_MEMORY_ALLOC_FAILED;
405     }
406     memcpy(pduData, pdu, size);
407
408     // clone remote endpoint
409     CAEndpoint_t *remoteEndpoint = CACloneEndpoint(endpoint);
410     if (NULL == remoteEndpoint)
411     {
412         OICFree(retData);
413         OICFree(pduData);
414         OIC_LOG(ERROR, TAG, "memory error");
415         return CA_MEMORY_ALLOC_FAILED;
416     }
417
418     // #2. add additional information. (time stamp, retransmission count...)
419     retData->timeStamp = getCurrentTimeInMicroSeconds();
420 #ifndef SINGLE_THREAD
421     retData->timeout = CAGetTimeoutValue();
422 #endif
423     retData->triedCount = 0;
424     retData->messageId = messageId;
425     retData->endpoint = remoteEndpoint;
426     retData->pdu = pduData;
427     retData->size = size;
428 #ifndef SINGLE_THREAD
429     // mutex lock
430     ca_mutex_lock(context->threadMutex);
431
432     uint32_t i = 0;
433     uint32_t len = u_arraylist_length(context->dataList);
434
435     // #3. add data into list
436     for (i = 0; i < len; i++)
437     {
438         CARetransmissionData_t *currData = u_arraylist_get(context->dataList, i);
439
440         if (NULL == currData)
441         {
442             continue;
443         }
444
445         // found index
446         if (NULL != currData->endpoint && currData->messageId == messageId
447             && (currData->endpoint->adapter == endpoint->adapter))
448         {
449             OIC_LOG(ERROR, TAG, "Duplicate message ID");
450
451             // mutex unlock
452             ca_mutex_unlock(context->threadMutex);
453
454             OICFree(retData);
455             OICFree(pduData);
456             OICFree(remoteEndpoint);
457             return CA_STATUS_FAILED;
458         }
459     }
460
461     u_arraylist_add(context->dataList, (void *) retData);
462
463     // notify the thread
464     ca_cond_signal(context->threadCond);
465
466     // mutex unlock
467     ca_mutex_unlock(context->threadMutex);
468
469 #else
470     u_arraylist_add(context->dataList, (void *) retData);
471
472     CACheckRetransmissionList(context);
473 #endif
474     return CA_STATUS_OK;
475 }
476
477 CAResult_t CARetransmissionReceivedData(CARetransmission_t *context,
478                                         const CAEndpoint_t *endpoint, const void *pdu,
479                                         uint32_t size, void **retransmissionPdu)
480 {
481     OIC_LOG(DEBUG, TAG, "IN");
482     if (NULL == context || NULL == endpoint || NULL == pdu || NULL == retransmissionPdu)
483     {
484         OIC_LOG(ERROR, TAG, "invalid parameter");
485         return CA_STATUS_INVALID_PARAM;
486     }
487
488     // #0. check support transport type
489     if (!(context->config.supportType & endpoint->adapter))
490     {
491         OIC_LOG_V(DEBUG, TAG, "not supported transport type=%d", endpoint->adapter);
492         return CA_STATUS_OK;
493     }
494
495     // #1. check PDU method type and get message id.
496     // ACK, RST --> remove the CON data
497     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
498     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
499
500     OIC_LOG_V(DEBUG, TAG, "received pdu, msgtype=%d, msgid=%d", type, messageId);
501
502     if ((CA_MSG_ACKNOWLEDGE != type) && (CA_MSG_RESET != type))
503     {
504         return CA_STATUS_OK;
505     }
506
507     // mutex lock
508     ca_mutex_lock(context->threadMutex);
509     uint32_t len = u_arraylist_length(context->dataList);
510
511     // find index
512     uint32_t i;
513     for (i = 0; i < len; i++)
514     {
515         CARetransmissionData_t *retData = (CARetransmissionData_t *) u_arraylist_get(
516                 context->dataList, i);
517
518         if (NULL == retData)
519         {
520             continue;
521         }
522
523         // found index
524         if (NULL != retData->endpoint && retData->messageId == messageId
525             && (retData->endpoint->adapter == endpoint->adapter))
526         {
527             // get pdu data for getting token when CA_EMPTY(RST/ACK) is received from remote device
528             // if retransmission was finish..token will be unavailable.
529             if (CA_EMPTY == CAGetCodeFromPduBinaryData(pdu, size))
530             {
531                 OIC_LOG(DEBUG, TAG, "code is CA_EMPTY");
532
533                 if (NULL == retData->pdu)
534                 {
535                     OIC_LOG(ERROR, TAG, "retData->pdu is null");
536                     OICFree(retData);
537                     // mutex unlock
538                     ca_mutex_unlock(context->threadMutex);
539
540                     return CA_STATUS_FAILED;
541                 }
542
543                 // copy PDU data
544                 (*retransmissionPdu) = (void *) OICCalloc(1, retData->size);
545                 if ((*retransmissionPdu) == NULL)
546                 {
547                     OICFree(retData);
548                     OIC_LOG(ERROR, TAG, "memory error");
549
550                     // mutex unlock
551                     ca_mutex_unlock(context->threadMutex);
552
553                     return CA_MEMORY_ALLOC_FAILED;
554                 }
555                 memcpy((*retransmissionPdu), retData->pdu, retData->size);
556             }
557
558             // #2. remove data from list
559             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
560             if (NULL == removedData)
561             {
562                 OIC_LOG(ERROR, TAG, "Removed data is NULL");
563
564                 // mutex unlock
565                 ca_mutex_unlock(context->threadMutex);
566
567                 return CA_STATUS_FAILED;
568             }
569
570             OIC_LOG_V(DEBUG, TAG, "remove RTCON data!!, msgid=%d", messageId);
571
572             CAFreeEndpoint(removedData->endpoint);
573             OICFree(removedData->pdu);
574             OICFree(removedData);
575
576             break;
577         }
578     }
579
580     // mutex unlock
581     ca_mutex_unlock(context->threadMutex);
582
583     OIC_LOG(DEBUG, TAG, "OUT");
584     return CA_STATUS_OK;
585 }
586
587 CAResult_t CARetransmissionStop(CARetransmission_t *context)
588 {
589     if (NULL == context)
590     {
591         OIC_LOG(ERROR, TAG, "context is empty..");
592         return CA_STATUS_INVALID_PARAM;
593     }
594
595     OIC_LOG(DEBUG, TAG, "retransmission stop request!!");
596
597     // mutex lock
598     ca_mutex_lock(context->threadMutex);
599
600     // set stop flag
601     context->isStop = true;
602
603     // notify the thread
604     ca_cond_signal(context->threadCond);
605
606     ca_cond_wait(context->threadCond, context->threadMutex);
607
608     // mutex unlock
609     ca_mutex_unlock(context->threadMutex);
610
611     return CA_STATUS_OK;
612 }
613
614 CAResult_t CARetransmissionDestroy(CARetransmission_t *context)
615 {
616     if (NULL == context)
617     {
618         OIC_LOG(ERROR, TAG, "context is empty..");
619         return CA_STATUS_INVALID_PARAM;
620     }
621
622     OIC_LOG(DEBUG, TAG, "retransmission context destroy..");
623
624     ca_mutex_free(context->threadMutex);
625     context->threadMutex = NULL;
626     ca_cond_free(context->threadCond);
627     u_arraylist_free(&context->dataList);
628
629     return CA_STATUS_OK;
630 }
631
632 uint64_t getCurrentTimeInMicroSeconds()
633 {
634     OIC_LOG(DEBUG, TAG, "IN");
635     uint64_t currentTime = 0;
636
637 #ifdef __ANDROID__
638     struct timespec getTs;
639
640     clock_gettime(CLOCK_MONOTONIC, &getTs);
641
642     currentTime = (getTs.tv_sec * (uint64_t)1000000000 + getTs.tv_nsec)/1000;
643     OIC_LOG_V(DEBUG, TAG, "current time = %ld", currentTime);
644 #elif defined __ARDUINO__
645     currentTime = millis() * 1000;
646     OIC_LOG_V(DEBUG, TAG, "currtime=%lu", currentTime);
647 #else
648 #if _POSIX_TIMERS > 0
649     struct timespec ts;
650     clock_gettime(CLOCK_MONOTONIC, &ts);
651     currentTime = ts.tv_sec * USECS_PER_SEC + ts.tv_nsec / 1000;
652 #else
653     struct timeval tv;
654     gettimeofday(&tv, NULL);
655     currentTime = tv.tv_sec * USECS_PER_SEC + tv.tv_usec;
656 #endif
657 #endif
658
659     OIC_LOG(DEBUG, TAG, "OUT");
660     return currentTime;
661 }