base code related to coap header of 'CoAP Over TCP' in libcoap.
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /* *****************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "caprotocolmessage.h"
30 #include "logger.h"
31 #include "config.h" /* for coap protocol */
32 #include "oic_malloc.h"
33 #include "canetworkconfigurator.h"
34 #include "caadapterutils.h"
35 #include "cainterfacecontroller.h"
36 #include "caretransmission.h"
37
38 #ifdef WITH_BWT
39 #include "cablockwisetransfer.h"
40 #endif
41
42 #ifndef  SINGLE_THREAD
43 #include "uqueue.h"
44 #include "cathreadpool.h" /* for thread pool */
45 #include "caqueueingthread.h"
46
47 #define SINGLE_HANDLE
48 #define MAX_THREAD_POOL_SIZE    20
49
50 // thread pool handle
51 static ca_thread_pool_t g_threadPoolHandle = NULL;
52
53 // message handler main thread
54 static CAQueueingThread_t g_sendThread;
55 static CAQueueingThread_t g_receiveThread;
56
57 #else
58 #define CA_MAX_RT_ARRAY_SIZE    3
59 #endif  /* SINGLE_THREAD */
60
61 #define TAG "CA_MSG_HNDLR"
62
63 static CARetransmission_t g_retransmissionContext;
64
65 // handler field
66 static CARequestCallback g_requestHandler = NULL;
67 static CAResponseCallback g_responseHandler = NULL;
68 static CAErrorCallback g_errorHandler = NULL;
69
70 static void CAErrorHandler(const CAEndpoint_t *endpoint,
71                            const void *data, uint32_t dataLen,
72                            CAResult_t result);
73
74 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint,
75                                        const CARemoteId_t *identity,
76                                        const void *data, CADataType_t dataType);
77
78 static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info,
79                             CAResult_t result);
80
81 #ifdef SINGLE_THREAD
82 static void CAProcessReceivedData(CAData_t *data);
83 #endif
84 static void CADestroyData(void *data, uint32_t size);
85 static void CALogPayloadInfo(CAInfo_t *info);
86 static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *endpoint, uint16_t id);
87
88 #ifdef WITH_BWT
89 void CAAddDataToSendThread(CAData_t *data)
90 {
91     OIC_LOG(DEBUG, TAG, "IN");
92     VERIFY_NON_NULL_VOID(data, TAG, "data");
93
94     // add thread
95     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
96
97     OIC_LOG(DEBUG, TAG, "OUT");
98 }
99
100 void CAAddDataToReceiveThread(CAData_t *data)
101 {
102     OIC_LOG(DEBUG, TAG, "IN - CAAddDataToReceiveThread");
103     VERIFY_NON_NULL_VOID(data, TAG, "data");
104
105     // add thread
106     CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
107
108     OIC_LOG(DEBUG, TAG, "OUT - CAAddDataToReceiveThread");
109 }
110 #endif
111
112 static bool CAIsSelectedNetworkAvailable()
113 {
114     u_arraylist_t *list = CAGetSelectedNetworkList();
115     if (!list || u_arraylist_length(list) == 0)
116     {
117         OIC_LOG(ERROR, TAG, "No selected network");
118         return false;
119     }
120
121     return true;
122 }
123
124 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint,
125                                        const CARemoteId_t *identity,
126                                        const void *data, CADataType_t dataType)
127 {
128     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData IN");
129     CAInfo_t *info = NULL;
130     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
131     if (!cadata)
132     {
133         OIC_LOG(ERROR, TAG, "memory allocation failed");
134         return NULL;
135     }
136
137     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
138     if (!ep)
139     {
140         OIC_LOG(ERROR, TAG, "endpoint clone failed");
141         OICFree(cadata);
142         return NULL;
143     }
144
145     OIC_LOG_V(DEBUG, TAG, "address : %s", ep->addr);
146     CAResult_t result;
147
148     if(CA_RESPONSE_DATA == dataType)
149     {
150         CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
151         if (!resInfo)
152         {
153             OIC_LOG(ERROR, TAG, "memory allocation failed");
154             OICFree(cadata);
155             CAFreeEndpoint(ep);
156             return NULL;
157         }
158
159         result = CAGetResponseInfoFromPDU(data, resInfo, endpoint->flags);
160         if (CA_STATUS_OK != result)
161         {
162             OIC_LOG(ERROR, TAG, "CAGetResponseInfoFromPDU Failed");
163             CAFreeEndpoint(ep);
164             CADestroyResponseInfoInternal(resInfo);
165             OICFree(cadata);
166             return NULL;
167         }
168         cadata->responseInfo = resInfo;
169         info = &resInfo->info;
170         if (identity)
171         {
172             info->identity = *identity;
173         }
174         OIC_LOG(DEBUG, TAG, "Response Info :");
175         CALogPayloadInfo(info);
176     }
177     else if (CA_REQUEST_DATA == dataType)
178     {
179         CARequestInfo_t* reqInfo = (CARequestInfo_t*)OICCalloc(1, sizeof(CARequestInfo_t));
180         if (!reqInfo)
181         {
182             OIC_LOG(ERROR, TAG, "memory allocation failed");
183             OICFree(cadata);
184             CAFreeEndpoint(ep);
185             return NULL;
186         }
187
188         result = CAGetRequestInfoFromPDU(data, reqInfo, endpoint->flags);
189         if (CA_STATUS_OK != result)
190         {
191             OIC_LOG(ERROR, TAG, "CAGetRequestInfoFromPDU failed");
192             CAFreeEndpoint(ep);
193             CADestroyRequestInfoInternal(reqInfo);
194             OICFree(cadata);
195             return NULL;
196         }
197
198         if (CADropSecondMessage(&caglobals.ca.requestHistory, endpoint, reqInfo->info.messageId))
199         {
200             OIC_LOG(ERROR, TAG, "Second Request with same Token, Drop it");
201             CAFreeEndpoint(ep);
202             CADestroyRequestInfoInternal(reqInfo);
203             OICFree(cadata);
204             return NULL;
205         }
206
207         cadata->requestInfo = reqInfo;
208         info = &reqInfo->info;
209         if (identity)
210         {
211             info->identity = *identity;
212         }
213         OIC_LOG(DEBUG, TAG, "Request Info :");
214         CALogPayloadInfo(info);
215    }
216     else if (CA_ERROR_DATA == dataType)
217     {
218         CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
219         if (!errorInfo)
220         {
221             OIC_LOG(ERROR, TAG, "Memory allocation failed!");
222             OICFree(cadata);
223             CAFreeEndpoint(ep);
224             return NULL;
225         }
226
227         CAResult_t result = CAGetErrorInfoFromPDU(data, errorInfo, endpoint->flags);
228         if (CA_STATUS_OK != result)
229         {
230             OIC_LOG(ERROR, TAG, "CAGetErrorInfoFromPDU failed");
231             CAFreeEndpoint(ep);
232             OICFree(errorInfo);
233             OICFree(cadata);
234             return NULL;
235         }
236
237         cadata->errorInfo = errorInfo;
238         info = &errorInfo->info;
239         if (identity)
240         {
241             info->identity = *identity;
242         }
243         OIC_LOG(DEBUG, TAG, "error Info :");
244         CALogPayloadInfo(info);
245     }
246
247     cadata->remoteEndpoint = ep;
248     cadata->dataType = dataType;
249
250     return cadata;
251
252     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData OUT");
253 }
254
255 static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uint32_t size)
256 {
257     OIC_LOG(DEBUG, TAG, "IN");
258     VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint");
259     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
260
261     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
262     if (!ep)
263     {
264         OIC_LOG(ERROR, TAG, "clone failed");
265         return;
266     }
267
268     CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
269
270     if (!resInfo)
271     {
272         OIC_LOG(ERROR, TAG, "calloc failed");
273         CAFreeEndpoint(ep);
274         return;
275     }
276
277     resInfo->result = CA_RETRANSMIT_TIMEOUT;
278     resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size);
279     resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
280
281     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *) pdu, &(resInfo->info),
282                                        endpoint->flags);
283     if (CA_STATUS_OK != res)
284     {
285         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
286         CADestroyResponseInfoInternal(resInfo);
287         CAFreeEndpoint(ep);
288         return;
289     }
290
291     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
292     if (NULL == cadata)
293     {
294         OIC_LOG(ERROR, TAG, "memory allocation failed !");
295         CAFreeEndpoint(ep);
296         CADestroyResponseInfoInternal(resInfo);
297         return;
298     }
299
300     cadata->type = SEND_TYPE_UNICAST;
301     cadata->remoteEndpoint = ep;
302     cadata->requestInfo = NULL;
303     cadata->responseInfo = resInfo;
304
305 #ifdef SINGLE_THREAD
306     CAProcessReceivedData(cadata);
307 #else
308     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
309 #endif
310
311     OIC_LOG(DEBUG, TAG, "OUT");
312 }
313
314 static void CADestroyData(void *data, uint32_t size)
315 {
316     OIC_LOG(DEBUG, TAG, "CADestroyData IN");
317     if ((size_t)size < sizeof(CAData_t))
318     {
319         OIC_LOG_V(ERROR, TAG, "Destroy data too small %p %d", data, size);
320     }
321     CAData_t *cadata = (CAData_t *) data;
322
323     if (NULL == cadata)
324     {
325         OIC_LOG(ERROR, TAG, "cadata is NULL");
326         return;
327     }
328
329     if (NULL != cadata->remoteEndpoint)
330     {
331         CAFreeEndpoint(cadata->remoteEndpoint);
332     }
333
334     if (NULL != cadata->requestInfo)
335     {
336         CADestroyRequestInfoInternal((CARequestInfo_t *) cadata->requestInfo);
337     }
338
339     if (NULL != cadata->responseInfo)
340     {
341         CADestroyResponseInfoInternal((CAResponseInfo_t *) cadata->responseInfo);
342     }
343
344     if (NULL != cadata->errorInfo)
345     {
346         CADestroyErrorInfoInternal(cadata->errorInfo);
347     }
348
349     OICFree(cadata);
350     OIC_LOG(DEBUG, TAG, "CADestroyData OUT");
351 }
352
353 #ifdef SINGLE_THREAD
354 static void CAProcessReceivedData(CAData_t *data)
355 {
356     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData IN");
357     if (!data)
358     {
359         OIC_LOG(ERROR, TAG, "thread data error!!");
360         return;
361     }
362
363     // parse the data and call the callbacks.
364     // #1 parse the data
365     // #2 get endpoint
366     CAEndpoint_t *rep = (CAEndpoint_t *)(data->remoteEndpoint);
367     if (!rep)
368     {
369         OIC_LOG(ERROR, TAG, "remoteEndpoint error!!");
370         return;
371     }
372
373     if (data->requestInfo && g_requestHandler)
374     {
375         g_requestHandler(rep, data->requestInfo);
376     }
377     else if (data->responseInfo && g_responseHandler)
378     {
379         g_responseHandler(rep, data->responseInfo);
380     }
381     else if (data->errorInfo && g_errorHandler)
382     {
383         g_errorHandler(rep, data->errorInfo);
384     }
385
386 #ifdef SINGLE_THREAD
387     CADestroyData(data, sizeof(CAData_t));
388 #endif
389
390     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData OUT");
391 }
392 #endif
393
394 #ifndef SINGLE_THREAD
395
396 static void CAReceiveThreadProcess(void *threadData)
397 {
398     OIC_LOG(DEBUG, TAG, "IN");
399 #ifndef SINGLE_HANDLE
400     CAData_t *data = (CAData_t *) threadData;
401     CAProcessReceivedData(data);
402 #else
403     (void)threadData;
404 #endif
405     OIC_LOG(DEBUG, TAG, "OUT");
406 }
407 #endif
408
409 static CAResult_t CAProcessSendData(const CAData_t *data)
410 {
411     OIC_LOG(DEBUG, TAG, "IN");
412     VERIFY_NON_NULL(data, TAG, "data");
413     VERIFY_NON_NULL(data->remoteEndpoint, TAG, "remoteEndpoint");
414
415     CAResult_t res = CA_STATUS_FAILED;
416
417     CASendDataType_t type = data->type;
418
419     coap_pdu_t *pdu = NULL;
420     CAInfo_t *info = NULL;
421
422     if (SEND_TYPE_UNICAST == type)
423     {
424
425         OIC_LOG(DEBUG,TAG,"Unicast message");
426         if (NULL != data->requestInfo)
427         {
428             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
429
430             info = &data->requestInfo->info;
431             pdu = CAGeneratePDU(data->requestInfo->method, info, data->remoteEndpoint);
432         }
433         else if (NULL != data->responseInfo)
434         {
435             OIC_LOG(DEBUG, TAG, "responseInfo is available..");
436
437             info = &data->responseInfo->info;
438             pdu = CAGeneratePDU(data->responseInfo->result, info, data->remoteEndpoint);
439         }
440         else
441         {
442             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
443             return CA_STATUS_INVALID_PARAM;
444         }
445
446         // interface controller function call.
447         if (NULL != pdu)
448         {
449 #ifdef WITH_BWT
450             if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter
451 #ifdef CI_ADAPTER
452                     && CA_IPV4_TCP != data->remoteEndpoint->flags
453 #endif
454                     )
455             {
456                 // Blockwise transfer
457                 if (NULL != info)
458                 {
459                     CAResult_t res = CAAddBlockOption(&pdu, *info,
460                                                       data->remoteEndpoint);
461                     if (CA_STATUS_OK != res)
462                     {
463                         OIC_LOG(INFO, TAG, "to write block option has failed");
464                         CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
465                         coap_delete_pdu(pdu);
466                         return res;
467                     }
468                 }
469             }
470 #endif
471             CALogPDUInfo(pdu, data->remoteEndpoint->flags);
472
473             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
474             if (CA_STATUS_OK != res)
475             {
476                 OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
477                 CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
478                 coap_delete_pdu(pdu);
479                 return res;
480             }
481
482 #ifdef CI_ADAPTER
483             if (CA_IPV4_TCP == data->remoteEndpoint->flags)
484             {
485                 OIC_LOG(INFO, TAG, "retransmission will be not worked");
486             }
487             else
488 #endif
489             {
490                 // for retransmission
491                 res = CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint, pdu->hdr,
492                                                pdu->length);
493                 if ((CA_STATUS_OK != res) && (CA_NOT_SUPPORTED != res))
494                 {
495                     //when retransmission not supported this will return CA_NOT_SUPPORTED, ignore
496                     OIC_LOG_V(INFO, TAG, "retransmission is not enabled due to error, res : %d", res);
497                     coap_delete_pdu(pdu);
498                     return res;
499                 }
500             }
501
502             coap_delete_pdu(pdu);
503         }
504         else
505         {
506             OIC_LOG(ERROR,TAG,"Failed to generate unicast PDU");
507             CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
508             return CA_SEND_FAILED;
509         }
510     }
511     else if (SEND_TYPE_MULTICAST == type)
512     {
513         OIC_LOG(DEBUG,TAG,"Multicast message");
514         if (NULL != data->requestInfo)
515         {
516             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
517
518             info = &data->requestInfo->info;
519             pdu = CAGeneratePDU(CA_GET, info, data->remoteEndpoint);
520             if (NULL != pdu)
521             {
522 #ifdef WITH_BWT
523                 if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter
524 #ifdef CI_ADAPTER
525                         && CA_IPV4_TCP != data->remoteEndpoint->flags
526 #endif
527                         )
528                 {
529                     // Blockwise transfer
530                     CAResult_t res = CAAddBlockOption(&pdu, data->requestInfo->info,
531                                                       data->remoteEndpoint);
532                     if (CA_STATUS_OK != res)
533                     {
534                         OIC_LOG(DEBUG, TAG, "CAAddBlockOption has failed");
535                         CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
536                         coap_delete_pdu(pdu);
537                         return res;
538                     }
539                 }
540 #endif
541                 CALogPDUInfo(pdu, data->remoteEndpoint->flags);
542
543                 res = CASendMulticastData(data->remoteEndpoint, pdu->hdr, pdu->length);
544                 if (CA_STATUS_OK != res)
545                 {
546                     OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
547                     CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
548                     coap_delete_pdu(pdu);
549                     return res;
550                 }
551
552                 OIC_LOG(DEBUG, TAG, "pdu to send :");
553                 OIC_LOG_BUFFER(DEBUG, TAG,  pdu->hdr, pdu->length);
554
555                 coap_delete_pdu(pdu);
556             }
557             else
558             {
559                 OIC_LOG(ERROR,TAG,"Failed to generate multicast PDU");
560                 CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
561                 return CA_SEND_FAILED;
562             }
563         }
564         else
565         {
566             OIC_LOG(ERROR, TAG, "request info is empty");
567             return CA_SEND_FAILED;
568         }
569     }
570
571     OIC_LOG(DEBUG, TAG, "OUT");
572     return CA_STATUS_OK;
573 }
574
575 #ifndef SINGLE_THREAD
576 static void CASendThreadProcess(void *threadData)
577 {
578     CAData_t *data = (CAData_t *) threadData;
579     CAProcessSendData(data);
580 }
581
582 #endif
583
584 /*
585  * If a second message arrives with the same token and the other address
586  * family, drop it.  Typically, IPv6 beats IPv4, so the IPv4 message is dropped.
587  */
588 static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *ep, uint16_t id)
589 {
590     if (!ep)
591     {
592         return true;
593     }
594     if (ep->adapter != CA_ADAPTER_IP)
595     {
596         return false;
597     }
598     if (!caglobals.ip.dualstack)
599     {
600         return false;
601     }
602
603     bool ret = false;
604     CATransportFlags_t familyFlags = ep->flags & CA_IPFAMILY_MASK;
605
606     for (size_t i = 0; i < sizeof(history->items) / sizeof(history->items[0]); i++)
607     {
608         CAHistoryItem_t *item = &(history->items[i]);
609         if (id == item->messageId)
610         {
611             if ((familyFlags ^ item->flags) == CA_IPFAMILY_MASK)
612             {
613                 OIC_LOG_V(INFO, TAG, "IPv%c duplicate message ignored",
614                                             familyFlags & CA_IPV6 ? '6' : '4');
615                 ret = true;
616                 break;
617             }
618         }
619     }
620
621     history->items[history->nextIndex].flags = familyFlags;
622     history->items[history->nextIndex].messageId = id;
623     if (++history->nextIndex >= HISTORYSIZE)
624     {
625         history->nextIndex = 0;
626     }
627
628     return ret;
629 }
630
631 static void CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
632                                      const void *data, uint32_t dataLen)
633 {
634     OIC_LOG(DEBUG, TAG, "IN");
635     VERIFY_NON_NULL_VOID(sep, TAG, "remoteEndpoint");
636     VERIFY_NON_NULL_VOID(data, TAG, "data");
637
638     OIC_LOG(DEBUG, TAG, "received pdu data :");
639     OIC_LOG_BUFFER(DEBUG, TAG,  data, dataLen);
640
641     uint32_t code = CA_NOT_FOUND;
642     CAData_t *cadata = NULL;
643
644     coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code,
645                                                 sep->endpoint.flags);
646     if (NULL == pdu)
647     {
648         OIC_LOG(ERROR, TAG, "Parse PDU failed");
649         return;
650     }
651
652     OIC_LOG_V(DEBUG, TAG, "code = %d", code);
653     if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code)
654     {
655         cadata = CAGenerateHandlerData(&(sep->endpoint), &(sep->identity), pdu, CA_REQUEST_DATA);
656         if (!cadata)
657         {
658             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
659             coap_delete_pdu(pdu);
660             return;
661         }
662     }
663     else
664     {
665         cadata = CAGenerateHandlerData(&(sep->endpoint), &(sep->identity), pdu, CA_RESPONSE_DATA);
666         if (!cadata)
667         {
668             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
669             coap_delete_pdu(pdu);
670             return;
671         }
672
673 #ifdef CI_ADAPTER
674         if (CA_IPV4_TCP == sep->endpoint.flags)
675         {
676             OIC_LOG(INFO, TAG, "retransmission will be not worked");
677         }
678         else
679 #endif
680         {
681             // for retransmission
682             void *retransmissionPdu = NULL;
683             CARetransmissionReceivedData(&g_retransmissionContext, cadata->remoteEndpoint, pdu->hdr,
684                                          pdu->length, &retransmissionPdu);
685
686             // get token from saved data in retransmission list
687             if (retransmissionPdu && CA_EMPTY == code)
688             {
689                 if (cadata->responseInfo)
690                 {
691                     CAInfo_t *info = &cadata->responseInfo->info;
692                     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu,
693                                                        info, sep->endpoint.flags);
694                     if (CA_STATUS_OK != res)
695                     {
696                         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
697                         OICFree(info->token);
698                         info->tokenLength = 0;
699                     }
700                 }
701             }
702             OICFree(retransmissionPdu);
703         }
704     }
705
706     cadata->type = SEND_TYPE_UNICAST;
707
708 #ifdef SINGLE_THREAD
709     CAProcessReceivedData(cadata);
710 #else
711 #ifdef WITH_BWT
712     if (CA_ADAPTER_GATT_BTLE != sep->endpoint.adapter
713 #ifdef CI_ADAPTER
714             && CA_IPV4_TCP != sep->endpoint.flags
715 #endif
716             )
717     {
718         CAResult_t res = CAReceiveBlockWiseData(pdu, &(sep->endpoint), cadata, dataLen);
719         if (CA_NOT_SUPPORTED == res)
720         {
721             OIC_LOG(ERROR, TAG, "this message does not have block option");
722             CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
723         }
724         else
725         {
726             CADestroyData(cadata, sizeof(CAData_t));
727         }
728     }
729     else
730 #endif
731     {
732         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
733     }
734 #endif
735
736     coap_delete_pdu(pdu);
737
738     OIC_LOG(DEBUG, TAG, "OUT");
739 }
740
741 static void CANetworkChangedCallback(const CAEndpoint_t *info, CANetworkStatus_t status)
742 {
743     (void)info;
744     (void)status;
745     OIC_LOG(DEBUG, TAG, "IN");
746
747     OIC_LOG(DEBUG, TAG, "OUT");
748 }
749
750 void CAHandleRequestResponseCallbacks()
751 {
752 #ifdef SINGLE_THREAD
753     CAReadData();
754     CARetransmissionBaseRoutine((void *)&g_retransmissionContext);
755 #else
756 #ifdef SINGLE_HANDLE
757     // parse the data and call the callbacks.
758     // #1 parse the data
759     // #2 get endpoint
760
761     ca_mutex_lock(g_receiveThread.threadMutex);
762
763     u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
764
765     ca_mutex_unlock(g_receiveThread.threadMutex);
766
767     if (NULL == item)
768     {
769         return;
770     }
771
772     // get values
773     void *msg = item->msg;
774
775     if (NULL == msg)
776     {
777         return;
778     }
779
780     // get endpoint
781     CAData_t *td = (CAData_t *) msg;
782
783     if (td->requestInfo && g_requestHandler)
784     {
785         OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
786         g_requestHandler(td->remoteEndpoint, td->requestInfo);
787     }
788     else if (td->responseInfo && g_responseHandler)
789     {
790         OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
791         g_responseHandler(td->remoteEndpoint, td->responseInfo);
792     }
793     else if (td->errorInfo && g_errorHandler)
794     {
795         OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
796         g_errorHandler(td->remoteEndpoint, td->errorInfo);
797     }
798
799     CADestroyData(msg, sizeof(CAData_t));
800     OICFree(item);
801
802 #endif /* SINGLE_HANDLE */
803 #endif
804 }
805
806 static CAData_t* CAPrepareSendData(const CAEndpoint_t *endpoint, const void *sendData,
807                                    CADataType_t dataType)
808 {
809     OIC_LOG(DEBUG, TAG, "CAPrepareSendData IN");
810
811     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
812     if (!cadata)
813     {
814         OIC_LOG(ERROR, TAG, "memory allocation failed");
815         return NULL;
816     }
817
818     if(CA_REQUEST_DATA == dataType)
819     {
820         // clone request info
821         CARequestInfo_t *request = CACloneRequestInfo((CARequestInfo_t *)sendData);
822
823         if(!request)
824         {
825             OIC_LOG(ERROR, TAG, "CACloneRequestInfo failed");
826             OICFree(cadata);
827             return NULL;
828         }
829
830         cadata->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST;
831         cadata->requestInfo =  request;
832     }
833     else if(CA_RESPONSE_DATA == dataType)
834     {
835         // clone response info
836         CAResponseInfo_t *response = CACloneResponseInfo((CAResponseInfo_t *)sendData);
837
838         if(!response)
839         {
840             OIC_LOG(ERROR, TAG, "CACloneResponseInfo failed");
841             OICFree(cadata);
842             return NULL;
843         }
844
845         cadata->type = SEND_TYPE_UNICAST;
846         cadata->responseInfo = response;
847     }
848     else
849     {
850         OIC_LOG(ERROR, TAG, "CAPrepareSendData unknown data type");
851         OICFree(cadata);
852         return NULL;
853     }
854
855     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
856     if (!ep)
857     {
858         OIC_LOG(ERROR, TAG, "endpoint clone failed");
859         CADestroyData(cadata, sizeof(CAData_t));
860         return NULL;
861     }
862
863     cadata->remoteEndpoint = ep;
864     cadata->dataType = dataType;
865     return cadata;
866 }
867
868 CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request)
869 {
870     OIC_LOG(DEBUG, TAG, "IN");
871
872     VERIFY_NON_NULL(object, TAG, "object");
873     VERIFY_NON_NULL(request, TAG, "request");
874
875     if (false == CAIsSelectedNetworkAvailable())
876     {
877         return CA_STATUS_FAILED;
878     }
879
880 #ifdef ARDUINO
881     // If max retransmission queue is reached, then don't handle new request
882     if (CA_MAX_RT_ARRAY_SIZE == u_arraylist_length(g_retransmissionContext.dataList))
883     {
884         OIC_LOG(ERROR, TAG, "max RT queue size reached!");
885         return CA_SEND_FAILED;
886     }
887 #endif /* ARDUINO */
888
889     CAData_t *data = CAPrepareSendData(object, request, CA_REQUEST_DATA);
890     if(!data)
891     {
892         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
893         return CA_MEMORY_ALLOC_FAILED;
894     }
895
896 #ifdef SINGLE_THREAD
897     CAResult_t result = CAProcessSendData(data);
898     if(CA_STATUS_OK != result)
899     {
900         OIC_LOG(ERROR, TAG, "CAProcessSendData failed");
901         return result;
902     }
903
904     CADestroyData(data, sizeof(CAData_t));
905 #else
906 #ifdef WITH_BWT
907     if (CA_ADAPTER_GATT_BTLE != object->adapter
908 #ifdef CI_ADAPTER
909             && CA_IPV4_TCP != object->flags
910 #endif
911             )
912     {
913         // send block data
914         CAResult_t res = CASendBlockWiseData(data);
915         if(CA_NOT_SUPPORTED == res)
916         {
917             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
918             CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
919             return CA_STATUS_OK;
920         }
921         else
922         {
923             CADestroyData(data, sizeof(CAData_t));
924         }
925         return res;
926     }
927     else
928 #endif
929     {
930         CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
931     }
932 #endif
933
934     OIC_LOG(DEBUG, TAG, "OUT");
935     return CA_STATUS_OK;
936 }
937
938 CAResult_t CADetachResponseMessage(const CAEndpoint_t *object,
939                                    const CAResponseInfo_t *response)
940 {
941     OIC_LOG(DEBUG, TAG, "IN");
942     VERIFY_NON_NULL(object, TAG, "object");
943     VERIFY_NON_NULL(response, TAG, "response");
944
945     if (false == CAIsSelectedNetworkAvailable())
946     {
947         return CA_STATUS_FAILED;
948     }
949
950     CAData_t *data = CAPrepareSendData(object, response, CA_RESPONSE_DATA);
951     if(!data)
952     {
953         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
954         return CA_MEMORY_ALLOC_FAILED;
955     }
956
957 #ifdef SINGLE_THREAD
958     CAResult_t result = CAProcessSendData(data);
959     if(result != CA_STATUS_OK)
960     {
961         OIC_LOG(ERROR, TAG, "CAProcessSendData failed");
962         return result;
963     }
964
965     CADestroyData(data, sizeof(CAData_t));
966 #else
967 #ifdef WITH_BWT
968     if (CA_ADAPTER_GATT_BTLE != object->adapter
969 #ifdef CI_ADAPTER
970             && CA_IPV4_TCP != object->flags
971 #endif
972             )
973     {
974         // send block data
975         CAResult_t res = CASendBlockWiseData(data);
976         if(CA_NOT_SUPPORTED == res)
977         {
978             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
979             CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
980             return CA_STATUS_OK;
981         }
982         else
983         {
984             CADestroyData(data, sizeof(CAData_t));
985         }
986         return res;
987     }
988     else
989 #endif
990     {
991         CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
992     }
993 #endif
994
995     OIC_LOG(DEBUG, TAG, "OUT");
996     return CA_STATUS_OK;
997 }
998
999 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
1000                                       uint8_t tokenLength, const CAHeaderOption_t *options,
1001                                       uint8_t numOptions)
1002 {
1003     (void)resourceUri;
1004     (void)token;
1005     (void)tokenLength;
1006     (void)options;
1007     (void)numOptions;
1008     return CA_NOT_SUPPORTED;
1009 }
1010
1011 void CASetInterfaceCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler,
1012                              CAErrorCallback errorHandler)
1013 {
1014     OIC_LOG(DEBUG, TAG, "IN");
1015     g_requestHandler = ReqHandler;
1016     g_responseHandler = RespHandler;
1017     g_errorHandler = errorHandler;
1018     OIC_LOG(DEBUG, TAG, "OUT");
1019 }
1020
1021 CAResult_t CAInitializeMessageHandler()
1022 {
1023     OIC_LOG(DEBUG, TAG, "IN");
1024     CASetPacketReceivedCallback(CAReceivedPacketCallback);
1025
1026     CASetNetworkChangeCallback(CANetworkChangedCallback);
1027     CASetErrorHandleCallback(CAErrorHandler);
1028
1029 #ifndef SINGLE_THREAD
1030     // create thread pool
1031     CAResult_t res = ca_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle);
1032
1033     if (CA_STATUS_OK != res)
1034     {
1035         OIC_LOG(ERROR, TAG, "thread pool initialize error.");
1036         return res;
1037     }
1038
1039     // send thread initialize
1040     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle,
1041                                                    CASendThreadProcess, CADestroyData))
1042     {
1043         OIC_LOG(ERROR, TAG, "Failed to Initialize send queue thread");
1044         return CA_STATUS_FAILED;
1045     }
1046
1047     // start send thread
1048     res = CAQueueingThreadStart(&g_sendThread);
1049
1050     if (CA_STATUS_OK != res)
1051     {
1052         OIC_LOG(ERROR, TAG, "thread start error(send thread).");
1053         ca_thread_pool_free(g_threadPoolHandle);
1054         g_threadPoolHandle = NULL;
1055         return res;
1056     }
1057
1058     // receive thread initialize
1059     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle,
1060                                                    CAReceiveThreadProcess, CADestroyData))
1061     {
1062         OIC_LOG(ERROR, TAG, "Failed to Initialize receive queue thread");
1063         return CA_STATUS_FAILED;
1064     }
1065
1066 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
1067     // start receive thread
1068     res = CAQueueingThreadStart(&gReceiveThread);
1069
1070     if (res != CA_STATUS_OK)
1071     {
1072         OIC_LOG(ERROR, TAG, "thread start error(receive thread).");
1073         return res;
1074     }
1075 #endif /* SINGLE_HANDLE */
1076
1077     // retransmission initialize
1078     CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData,
1079                                CATimeoutCallback, NULL);
1080
1081 #ifdef WITH_BWT
1082     // block-wise transfer initialize
1083     CAInitializeBlockWiseTransfer(CAAddDataToSendThread, CAAddDataToReceiveThread);
1084 #endif
1085
1086     // start retransmission
1087     res = CARetransmissionStart(&g_retransmissionContext);
1088
1089     if (CA_STATUS_OK != res)
1090     {
1091         OIC_LOG(ERROR, TAG, "thread start error(retransmission thread).");
1092         return res;
1093     }
1094
1095     // initialize interface adapters by controller
1096     CAInitializeAdapters(g_threadPoolHandle);
1097 #else
1098     // retransmission initialize
1099     CARetransmissionInitialize(&g_retransmissionContext, NULL, CASendUnicastData,
1100                                CATimeoutCallback, NULL);
1101     CAInitializeAdapters();
1102 #endif
1103
1104     OIC_LOG(DEBUG, TAG, "OUT");
1105     return CA_STATUS_OK;
1106 }
1107
1108 void CATerminateMessageHandler()
1109 {
1110     OIC_LOG(DEBUG, TAG, "IN");
1111 #ifndef SINGLE_THREAD
1112     CATransportAdapter_t connType;
1113     u_arraylist_t *list = CAGetSelectedNetworkList();
1114     uint32_t length = u_arraylist_length(list);
1115
1116     uint32_t i = 0;
1117     for (i = 0; i < length; i++)
1118     {
1119         void* ptrType = u_arraylist_get(list, i);
1120
1121         if (NULL == ptrType)
1122         {
1123             continue;
1124         }
1125
1126         connType = *(CATransportAdapter_t *)ptrType;
1127         CAStopAdapter(connType);
1128     }
1129
1130     // stop retransmission
1131     if (NULL != g_retransmissionContext.threadMutex)
1132     {
1133         CARetransmissionStop(&g_retransmissionContext);
1134     }
1135
1136     // stop thread
1137     // delete thread data
1138     if (NULL != g_sendThread.threadMutex)
1139     {
1140         CAQueueingThreadStop(&g_sendThread);
1141     }
1142
1143     // stop thread
1144     // delete thread data
1145     if (NULL != g_receiveThread.threadMutex)
1146     {
1147 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
1148         CAQueueingThreadStop(&gReceiveThread);
1149 #endif /* SINGLE_HANDLE */
1150     }
1151
1152     // destroy thread pool
1153     if (NULL != g_threadPoolHandle)
1154     {
1155         ca_thread_pool_free(g_threadPoolHandle);
1156         g_threadPoolHandle = NULL;
1157     }
1158
1159 #ifdef WITH_BWT
1160     CATerminateBlockWiseTransfer();
1161 #endif
1162     CARetransmissionDestroy(&g_retransmissionContext);
1163     CAQueueingThreadDestroy(&g_sendThread);
1164     CAQueueingThreadDestroy(&g_receiveThread);
1165
1166     // terminate interface adapters by controller
1167     CATerminateAdapters();
1168 #else
1169     // terminate interface adapters by controller
1170     CATerminateAdapters();
1171
1172     // stop retransmission
1173     CARetransmissionStop(&g_retransmissionContext);
1174     CARetransmissionDestroy(&g_retransmissionContext);
1175 #endif
1176
1177     OIC_LOG(DEBUG, TAG, "OUT");
1178 }
1179
1180 void CALogPDUInfo(coap_pdu_t *pdu, CATransportFlags_t flags)
1181 {
1182     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
1183
1184     OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
1185
1186 #ifdef CI_ADAPTER
1187     if (CA_IPV4_TCP != flags)
1188     {
1189         OIC_LOG(DEBUG, TAG, "pdu headert data :");
1190         OIC_LOG_BUFFER(DEBUG, TAG,  pdu->hdr, pdu->length);
1191     }
1192     else
1193 #endif
1194     {
1195         OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->coap_hdr_udp_t.type);
1196
1197         OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->coap_hdr_udp_t.code);
1198
1199         OIC_LOG(DEBUG, TAG, "PDU Maker - token :");
1200
1201         OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->coap_hdr_udp_t.token,
1202                        pdu->hdr->coap_hdr_udp_t.token_length);
1203     }
1204 }
1205
1206 static void CALogPayloadInfo(CAInfo_t *info)
1207 {
1208     if(info)
1209     {
1210         if (info->options)
1211         {
1212             for (uint32_t i = 0; i < info->numOptions; i++)
1213             {
1214                 OIC_LOG_V(DEBUG, TAG, "optionID: %d", info->options[i].optionID);
1215
1216                 OIC_LOG_V(DEBUG, TAG, "list: %s", info->options[i].optionData);
1217             }
1218         }
1219
1220         if (info->payload)
1221         {
1222             OIC_LOG_V(DEBUG, TAG, "payload: %p(%u)", info->payload,
1223                       info->payloadSize);
1224         }
1225
1226         if (info->token)
1227         {
1228             OIC_LOG(DEBUG, TAG, "token:");
1229             OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) info->token,
1230                            info->tokenLength);
1231         }
1232         OIC_LOG_V(DEBUG, TAG, "msgID: %d", info->messageId);
1233     }
1234     else
1235     {
1236         OIC_LOG(DEBUG, TAG, "info is NULL, cannot output log data");
1237     }
1238 }
1239
1240 void CAErrorHandler(const CAEndpoint_t *endpoint,
1241                     const void *data, uint32_t dataLen,
1242                     CAResult_t result)
1243 {
1244     OIC_LOG(DEBUG, TAG, "CAErrorHandler IN");
1245
1246 #ifndef SINGLE_THREAD
1247
1248     VERIFY_NON_NULL_VOID(endpoint, TAG, "remoteEndpoint");
1249     VERIFY_NON_NULL_VOID(data, TAG, "data");
1250
1251     uint32_t code = CA_NOT_FOUND;
1252     //Do not free remoteEndpoint and data. Currently they will be freed in data thread
1253     //Get PDU data
1254     coap_pdu_t *pdu = (coap_pdu_t *)CAParsePDU((const char *)data, dataLen, &code, endpoint->flags);
1255     if (NULL == pdu)
1256     {
1257         OIC_LOG(ERROR, TAG, "Parse PDU failed");
1258         return;
1259     }
1260
1261     CAData_t *cadata = CAGenerateHandlerData(endpoint, NULL, pdu, CA_ERROR_DATA);
1262     if(!cadata)
1263     {
1264         OIC_LOG(ERROR, TAG, "CAErrorHandler, CAGenerateHandlerData failed!");
1265         coap_delete_pdu(pdu);
1266         return;
1267     }
1268
1269     cadata->errorInfo->result = result;
1270
1271     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1272     coap_delete_pdu(pdu);
1273 #endif
1274
1275     OIC_LOG(DEBUG, TAG, "CAErrorHandler OUT");
1276     return;
1277 }
1278
1279 static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info, CAResult_t result)
1280 {
1281     OIC_LOG(DEBUG, TAG, "CASendErrorInfo IN");
1282 #ifndef SINGLE_THREAD
1283     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
1284     if (!cadata)
1285     {
1286         OIC_LOG(ERROR, TAG, "memory allocation failed");
1287         return;
1288     }
1289
1290     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
1291     if (!ep)
1292     {
1293         OIC_LOG(ERROR, TAG, "endpoint clone failed");
1294         OICFree(cadata);
1295         return;
1296     }
1297
1298     CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
1299     if (!errorInfo)
1300     {
1301         OICFree(cadata);
1302         CAFreeEndpoint(ep);
1303         return;
1304     }
1305
1306     CAResult_t res = CACloneInfo(info, &errorInfo->info);
1307     if (CA_STATUS_OK != res)
1308     {
1309         OICFree(cadata);
1310         CAFreeEndpoint(ep);
1311         return;
1312     }
1313
1314     errorInfo->result = result;
1315     cadata->remoteEndpoint = ep;
1316     cadata->errorInfo = errorInfo;
1317     cadata->dataType = CA_ERROR_DATA;
1318
1319     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1320 #endif
1321     OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
1322 }