Fixed a pair of deadlocks on shutdown
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caretransmission.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 // Defining _BSD_SOURCE or _DEFAULT_SOURCE causes header files to expose
22 // definitions that may otherwise be skipped. Skipping can cause implicit
23 // declaration warnings and/or bugs and subtle problems in code execution.
24 // For glibc information on feature test macros,
25 // Refer http://www.gnu.org/software/libc/manual/html_node/Feature-Test-Macros.html
26 //
27 // This file requires #define use due to random()
28 // For details on compatibility and glibc support,
29 // Refer http://www.gnu.org/software/libc/manual/html_node/BSD-Random.html
30 #define _DEFAULT_SOURCE
31 #define _BSD_SOURCE
32
33 // Defining _POSIX_C_SOURCE macro with 199309L (or greater) as value
34 // causes header files to expose definitions
35 // corresponding to the POSIX.1b, Real-time extensions
36 // (IEEE Std 1003.1b-1993) specification
37 //
38 // For this specific file, see use of clock_gettime,
39 // Refer to http://pubs.opengroup.org/stage7tc1/functions/clock_gettime.html
40 // and to http://man7.org/linux/man-pages/man2/clock_gettime.2.html
41 #ifndef _POSIX_C_SOURCE
42 #define _POSIX_C_SOURCE 200809L
43 #endif
44
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <string.h>
48 #include <unistd.h>
49 #include <time.h>
50 #include <sys/time.h>
51
52 #if defined(__ANDROID__)
53 #include <linux/time.h>
54 #endif
55
56 #include "caretransmission.h"
57 #include "caremotehandler.h"
58 #include "caprotocolmessage.h"
59 #include "oic_malloc.h"
60 #include "logger.h"
61
62 #define TAG PCF("CA")
63
64 typedef struct
65 {
66     uint64_t timeStamp;                 /**< last sent time. microseconds */
67     uint64_t timeout;                   /**< timeout value. microseconds */
68     uint8_t triedCount;                 /**< retransmission count */
69     uint16_t messageId;                 /**< coap PDU message id */
70     CARemoteEndpoint_t *endpoint;       /**< remote endpoint */
71     void *pdu;                          /**< coap PDU */
72     uint32_t size;                      /**< coap PDU size */
73 } CARetransmissionData_t;
74
75 static const uint64_t USECS_PER_SEC = 1000000;
76
77 /**
78  * @brief   getCurrent monotonic time
79  * @return  current time in microseconds
80  */
81 uint64_t getCurrentTimeInMicroSeconds();
82
83 /**
84  * @brief   check timeout routine
85  * @param   currentTime     [IN]microseconds
86  * @param   timeStamp       [IN]microseconds
87  * @param   timeoutValue    [IN]microseconds
88  * @param   triedCount      [IN]Number of retransmission tried
89  * @return  true if the timeout period has elapsed, false otherwise
90  */
91 static bool CACheckTimeout(uint64_t currentTime, uint64_t timeStamp, uint64_t timeoutValue,
92                            uint8_t triedCount)
93 {
94     // #1. calculate timeout
95     uint32_t milliTimeoutValue = timeoutValue * 0.001;
96     uint64_t timeout = (milliTimeoutValue << triedCount) * (uint64_t) 1000;
97
98     if (currentTime >= timeStamp + timeout)
99     {
100         OIC_LOG_V(DEBUG, TAG, "%d microseconds time out!!, tried count(%d)", timeout, triedCount);
101         return true;
102     }
103
104     return false;
105 }
106
107 /**
108  * @brief   timeout value is
109  *          between DEFAULT_ACK_TIMEOUT_SEC and
110  *          (DEFAULT_ACK_TIMEOUT_SEC * DEFAULT_RANDOM_FACTOR) second.
111  *          DEFAULT_RANDOM_FACTOR       1.5 (CoAP)
112  * @return  microseconds.
113  */
114 static uint64_t CAGetTimeoutValue()
115 {
116     return ((DEFAULT_ACK_TIMEOUT_SEC * 1000) + ((1000 * (random() & 0xFF)) >> 8)) *
117             (uint64_t) 1000;
118 }
119
120 static void CACheckRetransmissionList(CARetransmission_t *context)
121 {
122     if (NULL == context)
123     {
124         OIC_LOG(ERROR, TAG, "context is null..");
125         return;
126     }
127
128     // mutex lock
129     ca_mutex_lock(context->threadMutex);
130
131     uint32_t i = 0;
132     uint32_t len = u_arraylist_length(context->dataList);
133
134     for (i = 0; i < len; i++)
135     {
136         CARetransmissionData_t *retData = u_arraylist_get(context->dataList, i);
137
138         if (NULL == retData)
139         {
140             continue;
141         }
142
143         uint64_t currentTime = getCurrentTimeInMicroSeconds();
144
145         if (CACheckTimeout(currentTime, retData->timeStamp, retData->timeout, retData->triedCount))
146         {
147             // #2. if time's up, send the data.
148             if (NULL != context->dataSendMethod)
149             {
150                 OIC_LOG_V(DEBUG, TAG, "retransmission CON data!!, message id(%d)",
151                           retData->messageId);
152                 context->dataSendMethod(retData->endpoint, retData->pdu, retData->size);
153             }
154
155             // #3. increase the retransmission count and update timestamp.
156             retData->timeStamp = currentTime;
157             retData->triedCount++;
158         }
159
160         // #4. if tried count is max, remove the retransmission data from list.
161         if (retData->triedCount >= context->config.tryingCount)
162         {
163             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
164
165             if (NULL != removedData)
166             {
167                 OIC_LOG_V(DEBUG, TAG, "max trying count, remove retransmission CON data!!,\
168                           message id(%d)", removedData->messageId);
169
170                 // callback for retransmit timeout
171                 if (NULL != context->timeoutCallback)
172                 {
173                     context->timeoutCallback(removedData->endpoint, removedData->pdu,
174                                              removedData->size);
175                 }
176
177                 CADestroyRemoteEndpointInternal(removedData->endpoint);
178                 OICFree(removedData->pdu);
179
180                 OICFree(removedData);
181
182                 // modify loop value.
183                 len = u_arraylist_length(context->dataList);
184
185                 --i;
186             }
187             else
188             {
189                 OIC_LOG(ERROR, TAG, "arraylist remove error");
190             }
191
192         }
193     }
194
195     // mutex unlock
196     ca_mutex_unlock(context->threadMutex);
197 }
198
199 static void CARetransmissionBaseRoutine(void *threadValue)
200 {
201     OIC_LOG(DEBUG, TAG, "retransmission main thread start..");
202
203     CARetransmission_t *context = (CARetransmission_t *) threadValue;
204
205     if (NULL == context)
206     {
207         OIC_LOG(ERROR, TAG, "thread data passing error!!");
208
209         return;
210     }
211
212     while (!context->isStop)
213     {
214         // mutex lock
215         ca_mutex_lock(context->threadMutex);
216
217         if (!context->isStop && u_arraylist_length(context->dataList) <= 0)
218         {
219             // if list is empty, thread will wait
220             OIC_LOG(DEBUG, TAG, "wait..there is no retransmission data.");
221
222             // wait
223             ca_cond_wait(context->threadCond, context->threadMutex);
224
225             OIC_LOG(DEBUG, TAG, "wake up..");
226         }
227         else
228         {
229             // check each RETRANSMISSION_CHECK_PERIOD_SEC time.
230             OIC_LOG_V(DEBUG, TAG, "wait..(%d)microseconds",
231                       RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC);
232
233             // wait
234             uint64_t absTime = RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC;
235             ca_cond_wait_for(context->threadCond, context->threadMutex, absTime );
236         }
237
238         // mutex unlock
239         ca_mutex_unlock(context->threadMutex);
240
241         // check stop flag
242         if (context->isStop)
243         {
244             continue;
245         }
246
247         CACheckRetransmissionList(context);
248     }
249
250     ca_mutex_lock(context->threadMutex);
251     ca_cond_signal(context->threadCond);
252     ca_mutex_unlock(context->threadMutex);
253
254     OIC_LOG(DEBUG, TAG, "retransmission main thread end..");
255
256 }
257
258 CAResult_t CARetransmissionInitialize(CARetransmission_t *context, ca_thread_pool_t handle,
259                                       CADataSendMethod_t retransmissionSendMethod,
260                                       CATimeoutCallback_t timeoutCallback,
261                                       CARetransmissionConfig_t* config)
262 {
263     if (NULL == context)
264     {
265         OIC_LOG(ERROR, TAG, "thread instance is empty..");
266         return CA_STATUS_INVALID_PARAM;
267     }
268
269     if (NULL == handle)
270     {
271         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
272         return CA_STATUS_INVALID_PARAM;
273     }
274
275     OIC_LOG(DEBUG, TAG, "thread initialize..");
276
277     memset(context, 0, sizeof(CARetransmission_t));
278
279     CARetransmissionConfig_t cfg = { 0 };
280
281     if (NULL == config)
282     {
283         // setDefault
284         cfg.supportType = DEFAULT_RETRANSMISSION_TYPE;
285         cfg.tryingCount = DEFAULT_MAX_RETRANSMIT;
286     }
287     else
288     {
289         cfg = *config;
290     }
291
292     // set send thread data
293     context->threadPool = handle;
294     context->threadMutex = ca_mutex_new();
295     context->threadCond = ca_cond_new();
296     context->dataSendMethod = retransmissionSendMethod;
297     context->timeoutCallback = timeoutCallback;
298     context->config = cfg;
299     context->isStop = false;
300     context->dataList = u_arraylist_create();
301
302     return CA_STATUS_OK;
303 }
304
305 CAResult_t CARetransmissionStart(CARetransmission_t *context)
306 {
307     if (NULL == context)
308     {
309         OIC_LOG(ERROR, TAG, "context is empty..");
310         return CA_STATUS_INVALID_PARAM;
311     }
312
313     if (NULL == context->threadPool)
314     {
315         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
316         return CA_STATUS_INVALID_PARAM;
317     }
318
319     CAResult_t res = ca_thread_pool_add_task(context->threadPool, CARetransmissionBaseRoutine,
320                                             context);
321
322     if (CA_STATUS_OK != res)
323     {
324         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
325         return res;
326     }
327
328     return res;
329 }
330
331 CAResult_t CARetransmissionSentData(CARetransmission_t *context,
332                                     const CARemoteEndpoint_t* endpoint, const void* pdu,
333                                     uint32_t size)
334 {
335     if (NULL == context || NULL == endpoint || NULL == pdu)
336     {
337         OIC_LOG(ERROR, TAG, "invalid parameter..");
338         return CA_STATUS_INVALID_PARAM;
339     }
340
341     // #0. check support connectivity type
342     if (!(context->config.supportType & endpoint->connectivityType))
343     {
344         OIC_LOG_V(DEBUG, TAG, "not supported connectivity type for retransmission..(%d)",
345                   endpoint->connectivityType);
346         return CA_NOT_SUPPORTED;
347     }
348
349     // #1. check PDU method type and get message id.
350     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
351     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
352
353     OIC_LOG_V(DEBUG, TAG, "sent pdu, message type(%d), message id(%d)", type, messageId);
354
355     if (CA_MSG_CONFIRM != type)
356     {
357         OIC_LOG(DEBUG, TAG, "not supported message type for retransmission..");
358         return CA_NOT_SUPPORTED;
359     }
360
361     // create retransmission data
362     CARetransmissionData_t *retData = (CARetransmissionData_t *) OICCalloc(
363                                           1, sizeof(CARetransmissionData_t));
364
365     if (NULL == retData)
366     {
367         OIC_LOG(ERROR, TAG, "memory error!!");
368         return CA_MEMORY_ALLOC_FAILED;
369     }
370
371     // copy PDU data
372     void *pduData = (void *) OICMalloc(size);
373     if (NULL == pduData)
374     {
375         OICFree(retData);
376         OIC_LOG(ERROR, TAG, "memory error!!");
377         return CA_MEMORY_ALLOC_FAILED;
378     }
379     memcpy(pduData, pdu, size);
380
381     // clone remote endpoint
382     CARemoteEndpoint_t *remoteEndpoint = CACloneRemoteEndpoint(endpoint);
383     if (NULL == remoteEndpoint)
384     {
385         OICFree(retData);
386         OICFree(pduData);
387         OIC_LOG(ERROR, TAG, "memory error!!");
388         return CA_MEMORY_ALLOC_FAILED;
389     }
390
391     // #2. add additional information. (time stamp, retransmission count...)
392     retData->timeStamp = getCurrentTimeInMicroSeconds();
393     retData->timeout = CAGetTimeoutValue();
394     retData->triedCount = 0;
395     retData->messageId = messageId;
396     retData->endpoint = remoteEndpoint;
397     retData->pdu = pduData;
398     retData->size = size;
399
400     // mutex lock
401     ca_mutex_lock(context->threadMutex);
402
403     uint32_t i = 0;
404     uint32_t len = u_arraylist_length(context->dataList);
405
406     // #3. add data into list
407     for (i = 0; i < len; i++)
408     {
409         CARetransmissionData_t *currData = u_arraylist_get(context->dataList, i);
410
411         if (NULL == currData)
412         {
413             continue;
414         }
415
416         // found index
417         if (NULL != currData->endpoint && currData->messageId == messageId
418             && (currData->endpoint->connectivityType == endpoint->connectivityType))
419         {
420             OIC_LOG(ERROR, TAG, "Duplicate message ID");
421
422             // mutex unlock
423             ca_mutex_unlock(context->threadMutex);
424
425             OICFree(retData);
426             OICFree(pduData);
427             OICFree(remoteEndpoint);
428             return CA_STATUS_FAILED;
429         }
430     }
431
432     u_arraylist_add(context->dataList, (void *) retData);
433
434     // notify the thread
435     ca_cond_signal(context->threadCond);
436
437     // mutex unlock
438     ca_mutex_unlock(context->threadMutex);
439
440     return CA_STATUS_OK;
441 }
442
443 CAResult_t CARetransmissionReceivedData(CARetransmission_t *context,
444                                         const CARemoteEndpoint_t *endpoint, const void *pdu,
445                                         uint32_t size, void **retransmissionPdu)
446 {
447     OIC_LOG(DEBUG, TAG, "IN - CARetransmissionReceivedData");
448     if (NULL == context || NULL == endpoint || NULL == pdu || NULL == retransmissionPdu)
449     {
450         OIC_LOG(ERROR, TAG, "invalid parameter..");
451         return CA_STATUS_INVALID_PARAM;
452     }
453
454     // #0. check support connectivity type
455     if (!(context->config.supportType & endpoint->connectivityType))
456     {
457         OIC_LOG_V(DEBUG, TAG, "not supported connectivity type for retransmission..(%d)",
458                   endpoint->connectivityType);
459         return CA_STATUS_OK;
460     }
461
462     // #1. check PDU method type and get message id.
463     // ACK, RST --> remove the CON data
464     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
465     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
466
467     OIC_LOG_V(DEBUG, TAG, "received pdu, message type(%d), message id(%d)", type, messageId);
468
469     if ((CA_MSG_ACKNOWLEDGE != type) && (CA_MSG_RESET != type))
470     {
471         return CA_STATUS_OK;
472     }
473
474     // mutex lock
475     ca_mutex_lock(context->threadMutex);
476     uint32_t len = u_arraylist_length(context->dataList);
477
478     // find index
479     for (uint32_t i = 0; i < len; i++)
480     {
481         CARetransmissionData_t *retData = (CARetransmissionData_t *) u_arraylist_get(
482                 context->dataList, i);
483
484         if (NULL == retData)
485         {
486             continue;
487         }
488
489         // found index
490         if (NULL != retData->endpoint && retData->messageId == messageId
491             && (retData->endpoint->connectivityType == endpoint->connectivityType))
492         {
493             // get pdu data for getting token when CA_EMPTY(RST/ACK) is received from remote device
494             // if retransmission was finish..token will be unavailable.
495             if (CA_EMPTY == CAGetCodeFromPduBinaryData(pdu, size))
496             {
497                 OIC_LOG(DEBUG, TAG, "code is CA_EMPTY..");
498
499                 if (NULL == retData->pdu)
500                 {
501                     OIC_LOG(ERROR, TAG, "retData->pdu is null");
502                     OICFree(retData);
503                     // mutex unlock
504                     ca_mutex_unlock(context->threadMutex);
505
506                     return CA_STATUS_FAILED;
507                 }
508
509                 // copy PDU data
510                 (*retransmissionPdu) = (void *) OICCalloc(1, retData->size);
511                 if ((*retransmissionPdu) == NULL)
512                 {
513                     OICFree(retData);
514                     OIC_LOG(ERROR, TAG, "memory error!!");
515
516                     // mutex unlock
517                     ca_mutex_unlock(context->threadMutex);
518
519                     return CA_MEMORY_ALLOC_FAILED;
520                 }
521                 memcpy((*retransmissionPdu), retData->pdu, retData->size);
522             }
523
524             // #2. remove data from list
525             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
526             if (NULL == removedData)
527             {
528                 OIC_LOG(ERROR, TAG, "Removed data is NULL");
529
530                 // mutex unlock
531                 ca_mutex_unlock(context->threadMutex);
532
533                 return CA_STATUS_FAILED;
534             }
535
536             OIC_LOG_V(DEBUG, TAG, "remove retransmission CON data!!, message id(%d)",
537                       messageId);
538
539             CADestroyRemoteEndpointInternal(removedData->endpoint);
540             OICFree(removedData->pdu);
541             OICFree(removedData);
542
543             break;
544         }
545     }
546
547     // mutex unlock
548     ca_mutex_unlock(context->threadMutex);
549
550     OIC_LOG(DEBUG, TAG, "OUT - CARetransmissionReceivedData");
551     return CA_STATUS_OK;
552 }
553
554 CAResult_t CARetransmissionStop(CARetransmission_t *context)
555 {
556     if (NULL == context)
557     {
558         OIC_LOG(ERROR, TAG, "context is empty..");
559         return CA_STATUS_INVALID_PARAM;
560     }
561
562     OIC_LOG(DEBUG, TAG, "retransmission stop request!!");
563
564     // mutex lock
565     ca_mutex_lock(context->threadMutex);
566
567     // set stop flag
568     context->isStop = true;
569
570     // notify the thread
571     ca_cond_signal(context->threadCond);
572
573     ca_cond_wait(context->threadCond, context->threadMutex);
574
575     // mutex unlock
576     ca_mutex_unlock(context->threadMutex);
577
578     return CA_STATUS_OK;
579 }
580
581 CAResult_t CARetransmissionDestroy(CARetransmission_t *context)
582 {
583     if (NULL == context)
584     {
585         OIC_LOG(ERROR, TAG, "context is empty..");
586         return CA_STATUS_INVALID_PARAM;
587     }
588
589     OIC_LOG(DEBUG, TAG, "retransmission context destroy..");
590
591     ca_mutex_free(context->threadMutex);
592     context->threadMutex = NULL;
593     ca_cond_free(context->threadCond);
594     u_arraylist_free(&context->dataList);
595
596     return CA_STATUS_OK;
597 }
598
599 uint64_t getCurrentTimeInMicroSeconds()
600 {
601     OIC_LOG(DEBUG, TAG, "IN");
602     uint64_t currentTime = 0;
603
604 #ifdef __ANDROID__
605     struct timespec getTs;
606
607     clock_gettime(CLOCK_MONOTONIC, &getTs);
608
609     currentTime = (getTs.tv_sec * (uint64_t)1000000000 + getTs.tv_nsec)/1000;
610     OIC_LOG_V(DEBUG, TAG, "current time = %d", currentTime);
611 #else
612 #if _POSIX_TIMERS > 0
613     struct timespec ts;
614     clock_gettime(CLOCK_MONOTONIC, &ts);
615     currentTime = ts.tv_sec * USECS_PER_SEC + ts.tv_nsec / 1000;
616 #else
617     struct timeval tv;
618     gettimeofday(&tv, NULL);
619     currentTime = tv.tv_sec * USECS_PER_SEC + tv.tv_usec;
620 #endif
621 #endif
622
623     OIC_LOG(DEBUG, TAG, "OUT");
624     return currentTime;
625 }