Replace glib threadpool usage with a 'dumb' thread implementation.
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / caretransmission.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 // Defining _BSD_SOURCE or _DEFAULT_SOURCE causes header files to expose
22 // definitions that may otherwise be skipped. Skipping can cause implicit
23 // declaration warnings and/or bugs and subtle problems in code execution.
24 // For glibc information on feature test macros,
25 // Refer http://www.gnu.org/software/libc/manual/html_node/Feature-Test-Macros.html
26 //
27 // This file requires #define use due to random()
28 // For details on compatibility and glibc support,
29 // Refer http://www.gnu.org/software/libc/manual/html_node/BSD-Random.html
30 #define _DEFAULT_SOURCE
31 #define _BSD_SOURCE
32
33 // Defining _POSIX_C_SOURCE macro with 199309L (or greater) as value
34 // causes header files to expose definitions
35 // corresponding to the POSIX.1b, Real-time extensions
36 // (IEEE Std 1003.1b-1993) specification
37 //
38 // For this specific file, see use of clock_gettime,
39 // Refer to http://pubs.opengroup.org/stage7tc1/functions/clock_gettime.html
40 // and to http://man7.org/linux/man-pages/man2/clock_gettime.2.html
41 #ifndef _POSIX_C_SOURCE
42 #define _POSIX_C_SOURCE 200809L
43 #endif
44
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <string.h>
48 #include <unistd.h>
49 #include <time.h>
50 #include <sys/time.h>
51
52 #if defined(__ANDROID__)
53 #include <linux/time.h>
54 #endif
55
56 #include "caretransmission.h"
57 #include "caremotehandler.h"
58 #include "caprotocolmessage.h"
59 #include "oic_malloc.h"
60 #include "logger.h"
61
62 #define TAG PCF("CA")
63
64 typedef struct
65 {
66     uint64_t timeStamp;                 /**< last sent time. microseconds */
67     uint64_t timeout;                   /**< timeout value. microseconds */
68     uint8_t triedCount;                 /**< retransmission count */
69     uint16_t messageId;                 /**< coap PDU message id */
70     CARemoteEndpoint_t *endpoint;       /**< remote endpoint */
71     void *pdu;                          /**< coap PDU */
72     uint32_t size;                      /**< coap PDU size */
73 } CARetransmissionData_t;
74
75 static const uint64_t USECS_PER_SEC = 1000000;
76
77 /**
78  * @brief   getCurrent monotonic time
79  * @return  current time in microseconds
80  */
81 uint64_t getCurrentTimeInMicroSeconds();
82
83 /**
84  * @brief   check timeout routine
85  * @param   currentTime     [IN]microseconds
86  * @param   timeStamp       [IN]microseconds
87  * @param   timeoutValue    [IN]microseconds
88  * @param   triedCount      [IN]Number of retransmission tried
89  * @return  true if the timeout period has elapsed, false otherwise
90  */
91 static bool CACheckTimeout(uint64_t currentTime, uint64_t timeStamp, uint64_t timeoutValue,
92                            uint8_t triedCount)
93 {
94     // #1. calculate timeout
95     uint32_t milliTimeoutValue = timeoutValue * 0.001;
96     uint64_t timeout = (milliTimeoutValue << triedCount) * (uint64_t) 1000;
97
98     if (currentTime >= timeStamp + timeout)
99     {
100         OIC_LOG_V(DEBUG, TAG, "%d microseconds time out!!, tried count(%d)", timeout, triedCount);
101         return true;
102     }
103
104     return false;
105 }
106
107 /**
108  * @brief   timeout value is
109  *          between DEFAULT_ACK_TIMEOUT_SEC and
110  *          (DEFAULT_ACK_TIMEOUT_SEC * DEFAULT_RANDOM_FACTOR) second.
111  *          DEFAULT_RANDOM_FACTOR       1.5 (CoAP)
112  * @return  microseconds.
113  */
114 static uint64_t CAGetTimeoutValue()
115 {
116     return ((DEFAULT_ACK_TIMEOUT_SEC * 1000) + ((1000 * (random() & 0xFF)) >> 8)) *
117             (uint64_t) 1000;
118 }
119
120 static void CACheckRetransmissionList(CARetransmission_t *context)
121 {
122     if (NULL == context)
123     {
124         OIC_LOG(ERROR, TAG, "context is null..");
125         return;
126     }
127
128     // mutex lock
129     ca_mutex_lock(context->threadMutex);
130
131     uint32_t i = 0;
132     uint32_t len = u_arraylist_length(context->dataList);
133
134     for (i = 0; i < len; i++)
135     {
136         CARetransmissionData_t *retData = u_arraylist_get(context->dataList, i);
137
138         if (NULL == retData)
139         {
140             continue;
141         }
142
143         uint64_t currentTime = getCurrentTimeInMicroSeconds();
144
145         if (CACheckTimeout(currentTime, retData->timeStamp, retData->timeout, retData->triedCount))
146         {
147             // #2. if time's up, send the data.
148             if (NULL != context->dataSendMethod)
149             {
150                 OIC_LOG_V(DEBUG, TAG, "retransmission CON data!!, message id(%d)",
151                           retData->messageId);
152                 context->dataSendMethod(retData->endpoint, retData->pdu, retData->size);
153             }
154
155             // #3. increase the retransmission count and update timestamp.
156             retData->timeStamp = currentTime;
157             retData->triedCount++;
158         }
159
160         // #4. if tried count is max, remove the retransmission data from list.
161         if (retData->triedCount >= context->config.tryingCount)
162         {
163             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
164
165             if (NULL != removedData)
166             {
167                 OIC_LOG_V(DEBUG, TAG, "max trying count, remove retransmission CON data!!,\
168                           message id(%d)", removedData->messageId);
169
170                 // callback for retransmit timeout
171                 if (NULL != context->timeoutCallback)
172                 {
173                     context->timeoutCallback(removedData->endpoint, removedData->pdu,
174                                              removedData->size);
175                 }
176
177                 CADestroyRemoteEndpointInternal(removedData->endpoint);
178                 OICFree(removedData->pdu);
179
180                 OICFree(removedData);
181
182                 // modify loop value.
183                 len = u_arraylist_length(context->dataList);
184
185                 --i;
186             }
187             else
188             {
189                 OIC_LOG(ERROR, TAG, "arraylist remove error");
190             }
191
192         }
193     }
194
195     // mutex unlock
196     ca_mutex_unlock(context->threadMutex);
197 }
198
199 static void CARetransmissionBaseRoutine(void *threadValue)
200 {
201     OIC_LOG(DEBUG, TAG, "retransmission main thread start..");
202
203     CARetransmission_t *context = (CARetransmission_t *) threadValue;
204
205     if (NULL == context)
206     {
207         OIC_LOG(ERROR, TAG, "thread data passing error!!");
208
209         return;
210     }
211
212     while (!context->isStop)
213     {
214         // mutex lock
215         ca_mutex_lock(context->threadMutex);
216
217         if (u_arraylist_length(context->dataList) <= 0)
218         {
219             // if list is empty, thread will wait
220             OIC_LOG(DEBUG, TAG, "wait..there is no retransmission data.");
221
222             // wait
223             ca_cond_wait(context->threadCond, context->threadMutex);
224
225             OIC_LOG(DEBUG, TAG, "wake up..");
226         }
227         else
228         {
229             // check each RETRANSMISSION_CHECK_PERIOD_SEC time.
230             OIC_LOG_V(DEBUG, TAG, "wait..(%d)microseconds",
231                       RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC);
232
233             // wait
234             uint64_t absTime = RETRANSMISSION_CHECK_PERIOD_SEC * (uint64_t) USECS_PER_SEC;
235             ca_cond_wait_for(context->threadCond, context->threadMutex, absTime );
236         }
237
238         // mutex unlock
239         ca_mutex_unlock(context->threadMutex);
240
241         // check stop flag
242         if (context->isStop)
243         {
244             continue;
245         }
246
247         CACheckRetransmissionList(context);
248     }
249
250     ca_cond_signal(context->threadCond);
251
252     OIC_LOG(DEBUG, TAG, "retransmission main thread end..");
253
254 }
255
256 CAResult_t CARetransmissionInitialize(CARetransmission_t *context, ca_thread_pool_t handle,
257                                       CADataSendMethod_t retransmissionSendMethod,
258                                       CATimeoutCallback_t timeoutCallback,
259                                       CARetransmissionConfig_t* config)
260 {
261     if (NULL == context)
262     {
263         OIC_LOG(ERROR, TAG, "thread instance is empty..");
264         return CA_STATUS_INVALID_PARAM;
265     }
266
267     if (NULL == handle)
268     {
269         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
270         return CA_STATUS_INVALID_PARAM;
271     }
272
273     OIC_LOG(DEBUG, TAG, "thread initialize..");
274
275     memset(context, 0, sizeof(CARetransmission_t));
276
277     CARetransmissionConfig_t cfg = { 0 };
278
279     if (NULL == config)
280     {
281         // setDefault
282         cfg.supportType = DEFAULT_RETRANSMISSION_TYPE;
283         cfg.tryingCount = DEFAULT_MAX_RETRANSMIT;
284     }
285     else
286     {
287         cfg = *config;
288     }
289
290     // set send thread data
291     context->threadPool = handle;
292     context->threadMutex = ca_mutex_new();
293     context->threadCond = ca_cond_new();
294     context->dataSendMethod = retransmissionSendMethod;
295     context->timeoutCallback = timeoutCallback;
296     context->config = cfg;
297     context->isStop = false;
298     context->dataList = u_arraylist_create();
299
300     return CA_STATUS_OK;
301 }
302
303 CAResult_t CARetransmissionStart(CARetransmission_t *context)
304 {
305     if (NULL == context)
306     {
307         OIC_LOG(ERROR, TAG, "context is empty..");
308         return CA_STATUS_INVALID_PARAM;
309     }
310
311     if (NULL == context->threadPool)
312     {
313         OIC_LOG(ERROR, TAG, "thread pool handle is empty..");
314         return CA_STATUS_INVALID_PARAM;
315     }
316
317     CAResult_t res = ca_thread_pool_add_task(context->threadPool, CARetransmissionBaseRoutine,
318                                             context);
319
320     if (CA_STATUS_OK != res)
321     {
322         OIC_LOG(ERROR, TAG, "thread pool add task error(send thread).");
323         return res;
324     }
325
326     return res;
327 }
328
329 CAResult_t CARetransmissionSentData(CARetransmission_t *context,
330                                     const CARemoteEndpoint_t* endpoint, const void* pdu,
331                                     uint32_t size)
332 {
333     if (NULL == context || NULL == endpoint || NULL == pdu)
334     {
335         OIC_LOG(ERROR, TAG, "invalid parameter..");
336         return CA_STATUS_INVALID_PARAM;
337     }
338
339     // #0. check support connectivity type
340     if (!(context->config.supportType & endpoint->connectivityType))
341     {
342         OIC_LOG_V(DEBUG, TAG, "not supported connectivity type for retransmission..(%d)",
343                   endpoint->connectivityType);
344         return CA_NOT_SUPPORTED;
345     }
346
347     // #1. check PDU method type and get message id.
348     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
349     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
350
351     OIC_LOG_V(DEBUG, TAG, "sent pdu, message type(%d), message id(%d)", type, messageId);
352
353     if (CA_MSG_CONFIRM != type)
354     {
355         OIC_LOG(DEBUG, TAG, "not supported message type for retransmission..");
356         return CA_NOT_SUPPORTED;
357     }
358
359     // create retransmission data
360     CARetransmissionData_t *retData = (CARetransmissionData_t *) OICCalloc(
361                                           1, sizeof(CARetransmissionData_t));
362
363     if (NULL == retData)
364     {
365         OIC_LOG(ERROR, TAG, "memory error!!");
366         return CA_MEMORY_ALLOC_FAILED;
367     }
368
369     // copy PDU data
370     void *pduData = (void *) OICMalloc(size);
371     if (NULL == pduData)
372     {
373         OICFree(retData);
374         OIC_LOG(ERROR, TAG, "memory error!!");
375         return CA_MEMORY_ALLOC_FAILED;
376     }
377     memcpy(pduData, pdu, size);
378
379     // clone remote endpoint
380     CARemoteEndpoint_t *remoteEndpoint = CACloneRemoteEndpoint(endpoint);
381     if (NULL == remoteEndpoint)
382     {
383         OICFree(retData);
384         OICFree(pduData);
385         OIC_LOG(ERROR, TAG, "memory error!!");
386         return CA_MEMORY_ALLOC_FAILED;
387     }
388
389     // #2. add additional information. (time stamp, retransmission count...)
390     retData->timeStamp = getCurrentTimeInMicroSeconds();
391     retData->timeout = CAGetTimeoutValue();
392     retData->triedCount = 0;
393     retData->messageId = messageId;
394     retData->endpoint = remoteEndpoint;
395     retData->pdu = pduData;
396     retData->size = size;
397
398     // mutex lock
399     ca_mutex_lock(context->threadMutex);
400
401     uint32_t i = 0;
402     uint32_t len = u_arraylist_length(context->dataList);
403
404     // #3. add data into list
405     for (i = 0; i < len; i++)
406     {
407         CARetransmissionData_t *currData = u_arraylist_get(context->dataList, i);
408
409         if (NULL == currData)
410         {
411             continue;
412         }
413
414         // found index
415         if (NULL != currData->endpoint && currData->messageId == messageId
416             && (currData->endpoint->connectivityType == endpoint->connectivityType))
417         {
418             OIC_LOG(ERROR, TAG, "Duplicate message ID");
419
420             // mutex unlock
421             ca_mutex_unlock(context->threadMutex);
422
423             OICFree(retData);
424             OICFree(pduData);
425             OICFree(remoteEndpoint);
426             return CA_STATUS_FAILED;
427         }
428     }
429
430     u_arraylist_add(context->dataList, (void *) retData);
431
432     // notify the thread
433     ca_cond_signal(context->threadCond);
434
435     // mutex unlock
436     ca_mutex_unlock(context->threadMutex);
437
438     return CA_STATUS_OK;
439 }
440
441 CAResult_t CARetransmissionReceivedData(CARetransmission_t *context,
442                                         const CARemoteEndpoint_t *endpoint, const void *pdu,
443                                         uint32_t size, void **retransmissionPdu)
444 {
445     OIC_LOG(DEBUG, TAG, "IN - CARetransmissionReceivedData");
446     if (NULL == context || NULL == endpoint || NULL == pdu || NULL == retransmissionPdu)
447     {
448         OIC_LOG(ERROR, TAG, "invalid parameter..");
449         return CA_STATUS_INVALID_PARAM;
450     }
451
452     // #0. check support connectivity type
453     if (!(context->config.supportType & endpoint->connectivityType))
454     {
455         OIC_LOG_V(DEBUG, TAG, "not supported connectivity type for retransmission..(%d)",
456                   endpoint->connectivityType);
457         return CA_STATUS_OK;
458     }
459
460     // #1. check PDU method type and get message id.
461     // ACK, RST --> remove the CON data
462     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
463     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
464
465     OIC_LOG_V(DEBUG, TAG, "received pdu, message type(%d), message id(%d)", type, messageId);
466
467     if ((CA_MSG_ACKNOWLEDGE != type) && (CA_MSG_RESET != type))
468     {
469         return CA_STATUS_OK;
470     }
471
472     // mutex lock
473     ca_mutex_lock(context->threadMutex);
474     uint32_t len = u_arraylist_length(context->dataList);
475
476     // find index
477     for (uint32_t i = 0; i < len; i++)
478     {
479         CARetransmissionData_t *retData = (CARetransmissionData_t *) u_arraylist_get(
480                 context->dataList, i);
481
482         if (NULL == retData)
483         {
484             continue;
485         }
486
487         // found index
488         if (NULL != retData->endpoint && retData->messageId == messageId
489             && (retData->endpoint->connectivityType == endpoint->connectivityType))
490         {
491             // get pdu data for getting token when CA_EMPTY(RST/ACK) is received from remote device
492             // if retransmission was finish..token will be unavailable.
493             if (CA_EMPTY == CAGetCodeFromPduBinaryData(pdu, size))
494             {
495                 OIC_LOG(DEBUG, TAG, "code is CA_EMPTY..");
496
497                 if (NULL == retData->pdu)
498                 {
499                     OIC_LOG(ERROR, TAG, "retData->pdu is null");
500                     OICFree(retData);
501                     // mutex unlock
502                     ca_mutex_unlock(context->threadMutex);
503
504                     return CA_STATUS_FAILED;
505                 }
506
507                 // copy PDU data
508                 (*retransmissionPdu) = (void *) OICCalloc(1, retData->size);
509                 if ((*retransmissionPdu) == NULL)
510                 {
511                     OICFree(retData);
512                     OIC_LOG(ERROR, TAG, "memory error!!");
513
514                     // mutex unlock
515                     ca_mutex_unlock(context->threadMutex);
516
517                     return CA_MEMORY_ALLOC_FAILED;
518                 }
519                 memcpy((*retransmissionPdu), retData->pdu, retData->size);
520             }
521
522             // #2. remove data from list
523             CARetransmissionData_t *removedData = u_arraylist_remove(context->dataList, i);
524             if (NULL == removedData)
525             {
526                 OIC_LOG(ERROR, TAG, "Removed data is NULL");
527
528                 // mutex unlock
529                 ca_mutex_unlock(context->threadMutex);
530
531                 return CA_STATUS_FAILED;
532             }
533
534             OIC_LOG_V(DEBUG, TAG, "remove retransmission CON data!!, message id(%d)",
535                       messageId);
536
537             CADestroyRemoteEndpointInternal(removedData->endpoint);
538             OICFree(removedData->pdu);
539             OICFree(removedData);
540
541             break;
542         }
543     }
544
545     // mutex unlock
546     ca_mutex_unlock(context->threadMutex);
547
548     OIC_LOG(DEBUG, TAG, "OUT - CARetransmissionReceivedData");
549     return CA_STATUS_OK;
550 }
551
552 CAResult_t CARetransmissionStop(CARetransmission_t *context)
553 {
554     if (NULL == context)
555     {
556         OIC_LOG(ERROR, TAG, "context is empty..");
557         return CA_STATUS_INVALID_PARAM;
558     }
559
560     OIC_LOG(DEBUG, TAG, "retransmission stop request!!");
561
562     // mutex lock
563     ca_mutex_lock(context->threadMutex);
564
565     // set stop flag
566     context->isStop = true;
567
568     // notify the thread
569     ca_cond_signal(context->threadCond);
570
571     ca_cond_wait(context->threadCond, context->threadMutex);
572
573     // mutex unlock
574     ca_mutex_unlock(context->threadMutex);
575
576     return CA_STATUS_OK;
577 }
578
579 CAResult_t CARetransmissionDestroy(CARetransmission_t *context)
580 {
581     if (NULL == context)
582     {
583         OIC_LOG(ERROR, TAG, "context is empty..");
584         return CA_STATUS_INVALID_PARAM;
585     }
586
587     OIC_LOG(DEBUG, TAG, "retransmission context destroy..");
588
589     ca_mutex_free(context->threadMutex);
590     context->threadMutex = NULL;
591     ca_cond_free(context->threadCond);
592     u_arraylist_free(&context->dataList);
593
594     return CA_STATUS_OK;
595 }
596
597 uint64_t getCurrentTimeInMicroSeconds()
598 {
599     OIC_LOG(DEBUG, TAG, "IN");
600     uint64_t currentTime = 0;
601
602 #ifdef __ANDROID__
603     struct timespec getTs;
604
605     clock_gettime(CLOCK_MONOTONIC, &getTs);
606
607     currentTime = (getTs.tv_sec * (uint64_t)1000000000 + getTs.tv_nsec)/1000;
608     OIC_LOG_V(DEBUG, TAG, "current time = %d", currentTime);
609 #else
610 #if _POSIX_TIMERS > 0
611     struct timespec ts;
612     clock_gettime(CLOCK_MONOTONIC, &ts);
613     currentTime = ts.tv_sec * USECS_PER_SEC + ts.tv_nsec / 1000;
614 #else
615     struct timeval tv;
616     gettimeofday(&tv, NULL);
617     currentTime = tv.tv_sec * USECS_PER_SEC + tv.tv_usec;
618 #endif
619 #endif
620
621     OIC_LOG(DEBUG, TAG, "OUT");
622     return currentTime;
623 }