6ca7da64e55b8b622b3cfbc44a41d8d6c0e0b604
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /* *****************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "caprotocolmessage.h"
30 #include "logger.h"
31 #include "config.h" /* for coap protocol */
32 #include "oic_malloc.h"
33 #include "canetworkconfigurator.h"
34 #include "caadapterutils.h"
35 #include "cainterfacecontroller.h"
36 #include "caretransmission.h"
37
38 #ifdef WITH_BWT
39 #include "cablockwisetransfer.h"
40 #endif
41
42 #ifndef  SINGLE_THREAD
43 #include "uqueue.h"
44 #include "cathreadpool.h" /* for thread pool */
45 #include "caqueueingthread.h"
46
47 #define SINGLE_HANDLE
48 #define MAX_THREAD_POOL_SIZE    20
49
50 // thread pool handle
51 static ca_thread_pool_t g_threadPoolHandle = NULL;
52
53 // message handler main thread
54 static CAQueueingThread_t g_sendThread;
55 static CAQueueingThread_t g_receiveThread;
56
57 #else
58 #define CA_MAX_RT_ARRAY_SIZE    3
59 #endif  /* SINGLE_THREAD */
60
61 #define TAG "CA_MSG_HNDLR"
62
63 static CARetransmission_t g_retransmissionContext;
64
65 // handler field
66 static CARequestCallback g_requestHandler = NULL;
67 static CAResponseCallback g_responseHandler = NULL;
68 static CAErrorCallback g_errorHandler = NULL;
69
70 static void CAErrorHandler(const CAEndpoint_t *endpoint,
71                            const void *data, uint32_t dataLen,
72                            CAResult_t result);
73
74 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint,
75                                        const CARemoteId_t *identity,
76                                        const void *data, CADataType_t dataType);
77
78 static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info,
79                             CAResult_t result);
80
81 #ifdef SINGLE_THREAD
82 static void CAProcessReceivedData(CAData_t *data);
83 #endif
84 static void CADestroyData(void *data, uint32_t size);
85 static void CALogPayloadInfo(CAInfo_t *info);
86 static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *endpoint, uint16_t id);
87
88 #ifdef WITH_BWT
89 void CAAddDataToSendThread(CAData_t *data)
90 {
91     OIC_LOG(DEBUG, TAG, "IN");
92     VERIFY_NON_NULL_VOID(data, TAG, "data");
93
94     // add thread
95     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
96
97     OIC_LOG(DEBUG, TAG, "OUT");
98 }
99
100 void CAAddDataToReceiveThread(CAData_t *data)
101 {
102     OIC_LOG(DEBUG, TAG, "IN - CAAddDataToReceiveThread");
103     VERIFY_NON_NULL_VOID(data, TAG, "data");
104
105     // add thread
106     CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
107
108     OIC_LOG(DEBUG, TAG, "OUT - CAAddDataToReceiveThread");
109 }
110 #endif
111
112 static bool CAIsSelectedNetworkAvailable()
113 {
114     u_arraylist_t *list = CAGetSelectedNetworkList();
115     if (!list || u_arraylist_length(list) == 0)
116     {
117         OIC_LOG(ERROR, TAG, "No selected network");
118         return false;
119     }
120
121     return true;
122 }
123
124 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint,
125                                        const CARemoteId_t *identity,
126                                        const void *data, CADataType_t dataType)
127 {
128     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData IN");
129     CAInfo_t *info = NULL;
130     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
131     if (!cadata)
132     {
133         OIC_LOG(ERROR, TAG, "memory allocation failed");
134         return NULL;
135     }
136
137     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
138     if (!ep)
139     {
140         OIC_LOG(ERROR, TAG, "endpoint clone failed");
141         OICFree(cadata);
142         return NULL;
143     }
144
145     OIC_LOG_V(DEBUG, TAG, "address : %s", ep->addr);
146     CAResult_t result;
147
148     if(CA_RESPONSE_DATA == dataType)
149     {
150         CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
151         if (!resInfo)
152         {
153             OIC_LOG(ERROR, TAG, "memory allocation failed");
154             OICFree(cadata);
155             CAFreeEndpoint(ep);
156             return NULL;
157         }
158
159         result = CAGetResponseInfoFromPDU(data, resInfo, endpoint);
160         if (CA_STATUS_OK != result)
161         {
162             OIC_LOG(ERROR, TAG, "CAGetResponseInfoFromPDU Failed");
163             CAFreeEndpoint(ep);
164             CADestroyResponseInfoInternal(resInfo);
165             OICFree(cadata);
166             return NULL;
167         }
168         cadata->responseInfo = resInfo;
169         info = &resInfo->info;
170         if (identity)
171         {
172             info->identity = *identity;
173         }
174         OIC_LOG(DEBUG, TAG, "Response Info :");
175         CALogPayloadInfo(info);
176     }
177     else if (CA_REQUEST_DATA == dataType)
178     {
179         CARequestInfo_t* reqInfo = (CARequestInfo_t*)OICCalloc(1, sizeof(CARequestInfo_t));
180         if (!reqInfo)
181         {
182             OIC_LOG(ERROR, TAG, "memory allocation failed");
183             OICFree(cadata);
184             CAFreeEndpoint(ep);
185             return NULL;
186         }
187
188         result = CAGetRequestInfoFromPDU(data, endpoint, reqInfo);
189         if (CA_STATUS_OK != result)
190         {
191             OIC_LOG(ERROR, TAG, "CAGetRequestInfoFromPDU failed");
192             CAFreeEndpoint(ep);
193             CADestroyRequestInfoInternal(reqInfo);
194             OICFree(cadata);
195             return NULL;
196         }
197
198         if (CADropSecondMessage(&caglobals.ca.requestHistory, endpoint, reqInfo->info.messageId))
199         {
200             OIC_LOG(ERROR, TAG, "Second Request with same Token, Drop it");
201             CAFreeEndpoint(ep);
202             CADestroyRequestInfoInternal(reqInfo);
203             OICFree(cadata);
204             return NULL;
205         }
206
207         cadata->requestInfo = reqInfo;
208         info = &reqInfo->info;
209         if (identity)
210         {
211             info->identity = *identity;
212         }
213         OIC_LOG(DEBUG, TAG, "Request Info :");
214         CALogPayloadInfo(info);
215    }
216     else if (CA_ERROR_DATA == dataType)
217     {
218         CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
219         if (!errorInfo)
220         {
221             OIC_LOG(ERROR, TAG, "Memory allocation failed!");
222             OICFree(cadata);
223             CAFreeEndpoint(ep);
224             return NULL;
225         }
226
227         CAResult_t result = CAGetErrorInfoFromPDU(data, endpoint, errorInfo);
228         if (CA_STATUS_OK != result)
229         {
230             OIC_LOG(ERROR, TAG, "CAGetErrorInfoFromPDU failed");
231             CAFreeEndpoint(ep);
232             OICFree(errorInfo);
233             OICFree(cadata);
234             return NULL;
235         }
236
237         cadata->errorInfo = errorInfo;
238         info = &errorInfo->info;
239         if (identity)
240         {
241             info->identity = *identity;
242         }
243         OIC_LOG(DEBUG, TAG, "error Info :");
244         CALogPayloadInfo(info);
245     }
246
247     cadata->remoteEndpoint = ep;
248     cadata->dataType = dataType;
249
250     return cadata;
251
252     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData OUT");
253 }
254
255 static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uint32_t size)
256 {
257     OIC_LOG(DEBUG, TAG, "IN");
258     VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint");
259     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
260
261     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
262     if (!ep)
263     {
264         OIC_LOG(ERROR, TAG, "clone failed");
265         return;
266     }
267
268     CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
269
270     if (!resInfo)
271     {
272         OIC_LOG(ERROR, TAG, "calloc failed");
273         CAFreeEndpoint(ep);
274         return;
275     }
276
277     resInfo->result = CA_RETRANSMIT_TIMEOUT;
278     resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size);
279     resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
280
281     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *) pdu, &(resInfo->info),
282                                        endpoint);
283     if (CA_STATUS_OK != res)
284     {
285         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
286         CADestroyResponseInfoInternal(resInfo);
287         CAFreeEndpoint(ep);
288         return;
289     }
290
291     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
292     if (NULL == cadata)
293     {
294         OIC_LOG(ERROR, TAG, "memory allocation failed !");
295         CAFreeEndpoint(ep);
296         CADestroyResponseInfoInternal(resInfo);
297         return;
298     }
299
300     cadata->type = SEND_TYPE_UNICAST;
301     cadata->remoteEndpoint = ep;
302     cadata->requestInfo = NULL;
303     cadata->responseInfo = resInfo;
304
305 #ifdef SINGLE_THREAD
306     CAProcessReceivedData(cadata);
307 #else
308     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
309 #endif
310
311     OIC_LOG(DEBUG, TAG, "OUT");
312 }
313
314 static void CADestroyData(void *data, uint32_t size)
315 {
316     OIC_LOG(DEBUG, TAG, "CADestroyData IN");
317     if ((size_t)size < sizeof(CAData_t))
318     {
319         OIC_LOG_V(ERROR, TAG, "Destroy data too small %p %d", data, size);
320     }
321     CAData_t *cadata = (CAData_t *) data;
322
323     if (NULL == cadata)
324     {
325         OIC_LOG(ERROR, TAG, "cadata is NULL");
326         return;
327     }
328
329     if (NULL != cadata->remoteEndpoint)
330     {
331         CAFreeEndpoint(cadata->remoteEndpoint);
332     }
333
334     if (NULL != cadata->requestInfo)
335     {
336         CADestroyRequestInfoInternal((CARequestInfo_t *) cadata->requestInfo);
337     }
338
339     if (NULL != cadata->responseInfo)
340     {
341         CADestroyResponseInfoInternal((CAResponseInfo_t *) cadata->responseInfo);
342     }
343
344     if (NULL != cadata->errorInfo)
345     {
346         CADestroyErrorInfoInternal(cadata->errorInfo);
347     }
348
349     OICFree(cadata);
350     OIC_LOG(DEBUG, TAG, "CADestroyData OUT");
351 }
352
353 #ifdef SINGLE_THREAD
354 static void CAProcessReceivedData(CAData_t *data)
355 {
356     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData IN");
357     if (!data)
358     {
359         OIC_LOG(ERROR, TAG, "thread data error!!");
360         return;
361     }
362
363     // parse the data and call the callbacks.
364     // #1 parse the data
365     // #2 get endpoint
366     CAEndpoint_t *rep = (CAEndpoint_t *)(data->remoteEndpoint);
367     if (!rep)
368     {
369         OIC_LOG(ERROR, TAG, "remoteEndpoint error!!");
370         return;
371     }
372
373     if (data->requestInfo && g_requestHandler)
374     {
375         g_requestHandler(rep, data->requestInfo);
376     }
377     else if (data->responseInfo && g_responseHandler)
378     {
379         g_responseHandler(rep, data->responseInfo);
380     }
381     else if (data->errorInfo && g_errorHandler)
382     {
383         g_errorHandler(rep, data->errorInfo);
384     }
385
386 #ifdef SINGLE_THREAD
387     CADestroyData(data, sizeof(CAData_t));
388 #endif
389
390     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData OUT");
391 }
392 #endif
393
394 #ifndef SINGLE_THREAD
395
396 static void CAReceiveThreadProcess(void *threadData)
397 {
398     OIC_LOG(DEBUG, TAG, "IN");
399 #ifndef SINGLE_HANDLE
400     CAData_t *data = (CAData_t *) threadData;
401     CAProcessReceivedData(data);
402 #else
403     (void)threadData;
404 #endif
405     OIC_LOG(DEBUG, TAG, "OUT");
406 }
407 #endif
408
409 static CAResult_t CAProcessSendData(const CAData_t *data)
410 {
411     OIC_LOG(DEBUG, TAG, "IN");
412     VERIFY_NON_NULL(data, TAG, "data");
413     VERIFY_NON_NULL(data->remoteEndpoint, TAG, "remoteEndpoint");
414
415     CAResult_t res = CA_STATUS_FAILED;
416
417     CASendDataType_t type = data->type;
418
419     coap_pdu_t *pdu = NULL;
420     CAInfo_t *info = NULL;
421
422     if (SEND_TYPE_UNICAST == type)
423     {
424         OIC_LOG(DEBUG,TAG,"Unicast message");
425         if (NULL != data->requestInfo)
426         {
427             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
428
429             info = &data->requestInfo->info;
430             pdu = CAGeneratePDU(data->requestInfo->method, info, data->remoteEndpoint);
431         }
432         else if (NULL != data->responseInfo)
433         {
434             OIC_LOG(DEBUG, TAG, "responseInfo is available..");
435
436             info = &data->responseInfo->info;
437             pdu = CAGeneratePDU(data->responseInfo->result, info, data->remoteEndpoint);
438         }
439         else
440         {
441             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
442             return CA_STATUS_INVALID_PARAM;
443         }
444
445         // interface controller function call.
446         if (NULL != pdu)
447         {
448 #ifdef WITH_BWT
449             if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter
450 #ifdef TCP_ADAPTER
451                     && CA_ADAPTER_TCP != data->remoteEndpoint->adapter
452 #endif
453                     )
454             {
455                 // Blockwise transfer
456                 if (NULL != info)
457                 {
458                     CAResult_t res = CAAddBlockOption(&pdu, *info,
459                                                       data->remoteEndpoint);
460                     if (CA_STATUS_OK != res)
461                     {
462                         OIC_LOG(INFO, TAG, "to write block option has failed");
463                         CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
464                         coap_delete_pdu(pdu);
465                         return res;
466                     }
467                 }
468             }
469 #endif
470             CALogPDUInfo(pdu, data->remoteEndpoint);
471
472             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
473             if (CA_STATUS_OK != res)
474             {
475                 OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
476                 CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
477                 coap_delete_pdu(pdu);
478                 return res;
479             }
480
481 #ifdef TCP_ADAPTER
482             if (CA_ADAPTER_TCP == data->remoteEndpoint->adapter)
483             {
484                 OIC_LOG(INFO, TAG, "retransmission will be not worked");
485             }
486             else
487 #endif
488             {
489                 // for retransmission
490                 res = CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint, pdu->hdr,
491                                                pdu->length);
492                 if ((CA_STATUS_OK != res) && (CA_NOT_SUPPORTED != res))
493                 {
494                     //when retransmission not supported this will return CA_NOT_SUPPORTED, ignore
495                     OIC_LOG_V(INFO, TAG, "retransmission is not enabled due to error, res : %d", res);
496                     coap_delete_pdu(pdu);
497                     return res;
498                 }
499             }
500
501             coap_delete_pdu(pdu);
502         }
503         else
504         {
505             OIC_LOG(ERROR,TAG,"Failed to generate unicast PDU");
506             CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
507             return CA_SEND_FAILED;
508         }
509     }
510     else if (SEND_TYPE_MULTICAST == type)
511     {
512         OIC_LOG(DEBUG,TAG,"Multicast message");
513         if (NULL != data->requestInfo)
514         {
515             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
516
517             info = &data->requestInfo->info;
518             pdu = CAGeneratePDU(CA_GET, info, data->remoteEndpoint);
519             if (NULL != pdu)
520             {
521 #ifdef WITH_BWT
522                 if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter
523 #ifdef TCP_ADAPTER
524                         && CA_ADAPTER_TCP != data->remoteEndpoint->adapter
525 #endif
526                         )
527                 {
528                     // Blockwise transfer
529                     CAResult_t res = CAAddBlockOption(&pdu, data->requestInfo->info,
530                                                       data->remoteEndpoint);
531                     if (CA_STATUS_OK != res)
532                     {
533                         OIC_LOG(DEBUG, TAG, "CAAddBlockOption has failed");
534                         CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
535                         coap_delete_pdu(pdu);
536                         return res;
537                     }
538                 }
539 #endif
540             }
541             else
542             {
543                 OIC_LOG(ERROR,TAG,"Failed to generate multicast PDU");
544                 CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
545                 return CA_SEND_FAILED;
546             }
547         }
548         else if (NULL != data->responseInfo)
549         {
550             OIC_LOG(DEBUG, TAG, "responseInfo is available..");
551
552             info = &data->responseInfo->info;
553             pdu = CAGeneratePDU(data->responseInfo->result, info, data->remoteEndpoint);
554
555             if (NULL != pdu)
556             {
557 #ifdef WITH_BWT
558                 if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter
559 #ifdef TCP_ADAPTER
560                         && CA_ADAPTER_TCP != data->remoteEndpoint->adapter
561 #endif
562                         )
563                 {
564                     // Blockwise transfer
565                     if (NULL != info)
566                     {
567                         CAResult_t res = CAAddBlockOption(&pdu, *info,
568                                 data->remoteEndpoint);
569                         if (CA_STATUS_OK != res)
570                         {
571                             OIC_LOG(INFO, TAG, "to write block option has failed");
572                             CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
573                             coap_delete_pdu(pdu);
574                             return res;
575                         }
576                     }
577                 }
578 #endif
579             }
580             else
581             {
582                 OIC_LOG(ERROR,TAG,"Failed to generate multicast PDU");
583                 CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
584                 return CA_SEND_FAILED;
585             }
586         }
587         else
588         {
589             OIC_LOG(ERROR, TAG, "request or response info is empty");
590             return CA_SEND_FAILED;
591         }
592
593         CALogPDUInfo(pdu, data->remoteEndpoint);
594
595         OIC_LOG(DEBUG, TAG, "pdu to send :");
596         OIC_LOG_BUFFER(DEBUG, TAG,  pdu->hdr, pdu->length);
597
598         res = CASendMulticastData(data->remoteEndpoint, pdu->hdr, pdu->length);
599         if (CA_STATUS_OK != res)
600         {
601             OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
602             CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
603             coap_delete_pdu(pdu);
604             return res;
605         }
606
607         coap_delete_pdu(pdu);
608     }
609
610     return CA_STATUS_OK;
611 }
612
613 #ifndef SINGLE_THREAD
614 static void CASendThreadProcess(void *threadData)
615 {
616     CAData_t *data = (CAData_t *) threadData;
617     CAProcessSendData(data);
618 }
619
620 #endif
621
622 /*
623  * If a second message arrives with the same token and the other address
624  * family, drop it.  Typically, IPv6 beats IPv4, so the IPv4 message is dropped.
625  */
626 static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *ep, uint16_t id)
627 {
628     if (!ep)
629     {
630         return true;
631     }
632     if (ep->adapter != CA_ADAPTER_IP)
633     {
634         return false;
635     }
636     if (!caglobals.ip.dualstack)
637     {
638         return false;
639     }
640
641     bool ret = false;
642     CATransportFlags_t familyFlags = ep->flags & CA_IPFAMILY_MASK;
643
644     for (size_t i = 0; i < sizeof(history->items) / sizeof(history->items[0]); i++)
645     {
646         CAHistoryItem_t *item = &(history->items[i]);
647         if (id == item->messageId)
648         {
649             if ((familyFlags ^ item->flags) == CA_IPFAMILY_MASK)
650             {
651                 OIC_LOG_V(INFO, TAG, "IPv%c duplicate message ignored",
652                                             familyFlags & CA_IPV6 ? '6' : '4');
653                 ret = true;
654                 break;
655             }
656         }
657     }
658
659     history->items[history->nextIndex].flags = familyFlags;
660     history->items[history->nextIndex].messageId = id;
661     if (++history->nextIndex >= HISTORYSIZE)
662     {
663         history->nextIndex = 0;
664     }
665
666     return ret;
667 }
668
669 static void CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
670                                      const void *data, uint32_t dataLen)
671 {
672     OIC_LOG(DEBUG, TAG, "IN");
673     VERIFY_NON_NULL_VOID(sep, TAG, "remoteEndpoint");
674     VERIFY_NON_NULL_VOID(data, TAG, "data");
675
676     OIC_LOG(DEBUG, TAG, "received pdu data :");
677     OIC_LOG_BUFFER(DEBUG, TAG,  data, dataLen);
678
679     uint32_t code = CA_NOT_FOUND;
680     CAData_t *cadata = NULL;
681
682     coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code,
683                                                 &(sep->endpoint));
684     if (NULL == pdu)
685     {
686         OIC_LOG(ERROR, TAG, "Parse PDU failed");
687         return;
688     }
689
690     OIC_LOG_V(DEBUG, TAG, "code = %d", code);
691     if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code)
692     {
693         cadata = CAGenerateHandlerData(&(sep->endpoint), &(sep->identity), pdu, CA_REQUEST_DATA);
694         if (!cadata)
695         {
696             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
697             coap_delete_pdu(pdu);
698             return;
699         }
700     }
701     else
702     {
703         cadata = CAGenerateHandlerData(&(sep->endpoint), &(sep->identity), pdu, CA_RESPONSE_DATA);
704         if (!cadata)
705         {
706             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
707             coap_delete_pdu(pdu);
708             return;
709         }
710
711 #ifdef TCP_ADAPTER
712         if (CA_ADAPTER_TCP == sep->endpoint.adapter)
713         {
714             OIC_LOG(INFO, TAG, "retransmission is not supported");
715         }
716         else
717 #endif
718         {
719             // for retransmission
720             void *retransmissionPdu = NULL;
721             CARetransmissionReceivedData(&g_retransmissionContext, cadata->remoteEndpoint, pdu->hdr,
722                                          pdu->length, &retransmissionPdu);
723
724             // get token from saved data in retransmission list
725             if (retransmissionPdu && CA_EMPTY == code)
726             {
727                 if (cadata->responseInfo)
728                 {
729                     CAInfo_t *info = &cadata->responseInfo->info;
730                     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu,
731                                                        info, &(sep->endpoint));
732                     if (CA_STATUS_OK != res)
733                     {
734                         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
735                         OICFree(info->token);
736                         info->tokenLength = 0;
737                     }
738                 }
739             }
740             OICFree(retransmissionPdu);
741         }
742     }
743
744     cadata->type = SEND_TYPE_UNICAST;
745
746 #ifdef SINGLE_THREAD
747     CAProcessReceivedData(cadata);
748 #else
749 #ifdef WITH_BWT
750     if (CA_ADAPTER_GATT_BTLE != sep->endpoint.adapter
751 #ifdef TCP_ADAPTER
752             && CA_ADAPTER_TCP != sep->endpoint.adapter
753 #endif
754             )
755     {
756         CAResult_t res = CAReceiveBlockWiseData(pdu, &(sep->endpoint), cadata, dataLen);
757         if (CA_NOT_SUPPORTED == res)
758         {
759             OIC_LOG(ERROR, TAG, "this message does not have block option");
760             CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
761         }
762         else
763         {
764             CADestroyData(cadata, sizeof(CAData_t));
765         }
766     }
767     else
768 #endif
769     {
770         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
771     }
772 #endif
773
774     coap_delete_pdu(pdu);
775
776     OIC_LOG(DEBUG, TAG, "OUT");
777 }
778
779 static void CANetworkChangedCallback(const CAEndpoint_t *info, CANetworkStatus_t status)
780 {
781     (void)info;
782     (void)status;
783     OIC_LOG(DEBUG, TAG, "IN");
784
785     OIC_LOG(DEBUG, TAG, "OUT");
786 }
787
788 void CAHandleRequestResponseCallbacks()
789 {
790 #ifdef SINGLE_THREAD
791     CAReadData();
792     CARetransmissionBaseRoutine((void *)&g_retransmissionContext);
793 #else
794 #ifdef SINGLE_HANDLE
795     // parse the data and call the callbacks.
796     // #1 parse the data
797     // #2 get endpoint
798
799     ca_mutex_lock(g_receiveThread.threadMutex);
800
801     u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
802
803     ca_mutex_unlock(g_receiveThread.threadMutex);
804
805     if (NULL == item)
806     {
807         return;
808     }
809
810     // get values
811     void *msg = item->msg;
812
813     if (NULL == msg)
814     {
815         return;
816     }
817
818     // get endpoint
819     CAData_t *td = (CAData_t *) msg;
820
821     if (td->requestInfo && g_requestHandler)
822     {
823         OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
824         g_requestHandler(td->remoteEndpoint, td->requestInfo);
825     }
826     else if (td->responseInfo && g_responseHandler)
827     {
828         OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
829         g_responseHandler(td->remoteEndpoint, td->responseInfo);
830     }
831     else if (td->errorInfo && g_errorHandler)
832     {
833         OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
834         g_errorHandler(td->remoteEndpoint, td->errorInfo);
835     }
836
837     CADestroyData(msg, sizeof(CAData_t));
838     OICFree(item);
839
840 #endif /* SINGLE_HANDLE */
841 #endif
842 }
843
844 static CAData_t* CAPrepareSendData(const CAEndpoint_t *endpoint, const void *sendData,
845                                    CADataType_t dataType)
846 {
847     OIC_LOG(DEBUG, TAG, "CAPrepareSendData IN");
848
849     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
850     if (!cadata)
851     {
852         OIC_LOG(ERROR, TAG, "memory allocation failed");
853         return NULL;
854     }
855
856     if(CA_REQUEST_DATA == dataType)
857     {
858         // clone request info
859         CARequestInfo_t *request = CACloneRequestInfo((CARequestInfo_t *)sendData);
860
861         if(!request)
862         {
863             OIC_LOG(ERROR, TAG, "CACloneRequestInfo failed");
864             OICFree(cadata);
865             return NULL;
866         }
867
868         cadata->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST;
869         cadata->requestInfo =  request;
870     }
871     else if(CA_RESPONSE_DATA == dataType)
872     {
873         // clone response info
874         CAResponseInfo_t *response = CACloneResponseInfo((CAResponseInfo_t *)sendData);
875
876         if(!response)
877         {
878             OIC_LOG(ERROR, TAG, "CACloneResponseInfo failed");
879             OICFree(cadata);
880             return NULL;
881         }
882
883         cadata->type = response->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST;
884         cadata->responseInfo = response;
885     }
886     else
887     {
888         OIC_LOG(ERROR, TAG, "CAPrepareSendData unknown data type");
889         OICFree(cadata);
890         return NULL;
891     }
892
893     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
894     if (!ep)
895     {
896         OIC_LOG(ERROR, TAG, "endpoint clone failed");
897         CADestroyData(cadata, sizeof(CAData_t));
898         return NULL;
899     }
900
901     cadata->remoteEndpoint = ep;
902     cadata->dataType = dataType;
903     return cadata;
904 }
905
906 CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request)
907 {
908     OIC_LOG(DEBUG, TAG, "IN");
909
910     VERIFY_NON_NULL(object, TAG, "object");
911     VERIFY_NON_NULL(request, TAG, "request");
912
913     if (false == CAIsSelectedNetworkAvailable())
914     {
915         return CA_STATUS_FAILED;
916     }
917
918 #ifdef ARDUINO
919     // If max retransmission queue is reached, then don't handle new request
920     if (CA_MAX_RT_ARRAY_SIZE == u_arraylist_length(g_retransmissionContext.dataList))
921     {
922         OIC_LOG(ERROR, TAG, "max RT queue size reached!");
923         return CA_SEND_FAILED;
924     }
925 #endif /* ARDUINO */
926
927     CAData_t *data = CAPrepareSendData(object, request, CA_REQUEST_DATA);
928     if(!data)
929     {
930         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
931         return CA_MEMORY_ALLOC_FAILED;
932     }
933
934 #ifdef SINGLE_THREAD
935     CAResult_t result = CAProcessSendData(data);
936     if(CA_STATUS_OK != result)
937     {
938         OIC_LOG(ERROR, TAG, "CAProcessSendData failed");
939         return result;
940     }
941
942     CADestroyData(data, sizeof(CAData_t));
943 #else
944 #ifdef WITH_BWT
945     if (CA_ADAPTER_GATT_BTLE != object->adapter
946 #ifdef TCP_ADAPTER
947             && CA_ADAPTER_TCP != object->adapter
948 #endif
949             )
950     {
951         // send block data
952         CAResult_t res = CASendBlockWiseData(data);
953         if(CA_NOT_SUPPORTED == res)
954         {
955             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
956             CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
957             return CA_STATUS_OK;
958         }
959         else
960         {
961             CADestroyData(data, sizeof(CAData_t));
962         }
963         return res;
964     }
965     else
966 #endif
967     {
968         CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
969     }
970 #endif
971
972     OIC_LOG(DEBUG, TAG, "OUT");
973     return CA_STATUS_OK;
974 }
975
976 CAResult_t CADetachResponseMessage(const CAEndpoint_t *object,
977                                    const CAResponseInfo_t *response)
978 {
979     OIC_LOG(DEBUG, TAG, "IN");
980     VERIFY_NON_NULL(object, TAG, "object");
981     VERIFY_NON_NULL(response, TAG, "response");
982
983     if (false == CAIsSelectedNetworkAvailable())
984     {
985         return CA_STATUS_FAILED;
986     }
987
988     CAData_t *data = CAPrepareSendData(object, response, CA_RESPONSE_DATA);
989     if(!data)
990     {
991         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
992         return CA_MEMORY_ALLOC_FAILED;
993     }
994
995 #ifdef SINGLE_THREAD
996     CAResult_t result = CAProcessSendData(data);
997     if(result != CA_STATUS_OK)
998     {
999         OIC_LOG(ERROR, TAG, "CAProcessSendData failed");
1000         return result;
1001     }
1002
1003     CADestroyData(data, sizeof(CAData_t));
1004 #else
1005 #ifdef WITH_BWT
1006     if (CA_ADAPTER_GATT_BTLE != object->adapter
1007 #ifdef TCP_ADAPTER
1008             && CA_ADAPTER_TCP != object->adapter
1009 #endif
1010             )
1011     {
1012         // send block data
1013         CAResult_t res = CASendBlockWiseData(data);
1014         if(CA_NOT_SUPPORTED == res)
1015         {
1016             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
1017             CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
1018             return CA_STATUS_OK;
1019         }
1020         else
1021         {
1022             CADestroyData(data, sizeof(CAData_t));
1023         }
1024         return res;
1025     }
1026     else
1027 #endif
1028     {
1029         CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
1030     }
1031 #endif
1032
1033     OIC_LOG(DEBUG, TAG, "OUT");
1034     return CA_STATUS_OK;
1035 }
1036
1037 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
1038                                       uint8_t tokenLength, const CAHeaderOption_t *options,
1039                                       uint8_t numOptions)
1040 {
1041     (void)resourceUri;
1042     (void)token;
1043     (void)tokenLength;
1044     (void)options;
1045     (void)numOptions;
1046     return CA_NOT_SUPPORTED;
1047 }
1048
1049 void CASetInterfaceCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler,
1050                              CAErrorCallback errorHandler)
1051 {
1052     OIC_LOG(DEBUG, TAG, "IN");
1053     g_requestHandler = ReqHandler;
1054     g_responseHandler = RespHandler;
1055     g_errorHandler = errorHandler;
1056     OIC_LOG(DEBUG, TAG, "OUT");
1057 }
1058
1059 CAResult_t CAInitializeMessageHandler()
1060 {
1061     OIC_LOG(DEBUG, TAG, "IN");
1062     CASetPacketReceivedCallback(CAReceivedPacketCallback);
1063
1064     CASetNetworkChangeCallback(CANetworkChangedCallback);
1065     CASetErrorHandleCallback(CAErrorHandler);
1066
1067 #ifndef SINGLE_THREAD
1068     // create thread pool
1069     CAResult_t res = ca_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle);
1070
1071     if (CA_STATUS_OK != res)
1072     {
1073         OIC_LOG(ERROR, TAG, "thread pool initialize error.");
1074         return res;
1075     }
1076
1077     // send thread initialize
1078     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle,
1079                                                    CASendThreadProcess, CADestroyData))
1080     {
1081         OIC_LOG(ERROR, TAG, "Failed to Initialize send queue thread");
1082         return CA_STATUS_FAILED;
1083     }
1084
1085     // start send thread
1086     res = CAQueueingThreadStart(&g_sendThread);
1087
1088     if (CA_STATUS_OK != res)
1089     {
1090         OIC_LOG(ERROR, TAG, "thread start error(send thread).");
1091         ca_thread_pool_free(g_threadPoolHandle);
1092         g_threadPoolHandle = NULL;
1093         return res;
1094     }
1095
1096     // receive thread initialize
1097     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle,
1098                                                    CAReceiveThreadProcess, CADestroyData))
1099     {
1100         OIC_LOG(ERROR, TAG, "Failed to Initialize receive queue thread");
1101         return CA_STATUS_FAILED;
1102     }
1103
1104 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
1105     // start receive thread
1106     res = CAQueueingThreadStart(&gReceiveThread);
1107
1108     if (res != CA_STATUS_OK)
1109     {
1110         OIC_LOG(ERROR, TAG, "thread start error(receive thread).");
1111         return res;
1112     }
1113 #endif /* SINGLE_HANDLE */
1114
1115     // retransmission initialize
1116     CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData,
1117                                CATimeoutCallback, NULL);
1118
1119 #ifdef WITH_BWT
1120     // block-wise transfer initialize
1121     CAInitializeBlockWiseTransfer(CAAddDataToSendThread, CAAddDataToReceiveThread);
1122 #endif
1123
1124     // start retransmission
1125     res = CARetransmissionStart(&g_retransmissionContext);
1126
1127     if (CA_STATUS_OK != res)
1128     {
1129         OIC_LOG(ERROR, TAG, "thread start error(retransmission thread).");
1130         return res;
1131     }
1132
1133     // initialize interface adapters by controller
1134     CAInitializeAdapters(g_threadPoolHandle);
1135 #else
1136     // retransmission initialize
1137     CARetransmissionInitialize(&g_retransmissionContext, NULL, CASendUnicastData,
1138                                CATimeoutCallback, NULL);
1139     CAInitializeAdapters();
1140 #endif
1141
1142     OIC_LOG(DEBUG, TAG, "OUT");
1143     return CA_STATUS_OK;
1144 }
1145
1146 void CATerminateMessageHandler()
1147 {
1148     OIC_LOG(DEBUG, TAG, "IN");
1149 #ifndef SINGLE_THREAD
1150     CATransportAdapter_t connType;
1151     u_arraylist_t *list = CAGetSelectedNetworkList();
1152     uint32_t length = u_arraylist_length(list);
1153
1154     uint32_t i = 0;
1155     for (i = 0; i < length; i++)
1156     {
1157         void* ptrType = u_arraylist_get(list, i);
1158
1159         if (NULL == ptrType)
1160         {
1161             continue;
1162         }
1163
1164         connType = *(CATransportAdapter_t *)ptrType;
1165         CAStopAdapter(connType);
1166     }
1167
1168     // stop retransmission
1169     if (NULL != g_retransmissionContext.threadMutex)
1170     {
1171         CARetransmissionStop(&g_retransmissionContext);
1172     }
1173
1174     // stop thread
1175     // delete thread data
1176     if (NULL != g_sendThread.threadMutex)
1177     {
1178         CAQueueingThreadStop(&g_sendThread);
1179     }
1180
1181     // stop thread
1182     // delete thread data
1183     if (NULL != g_receiveThread.threadMutex)
1184     {
1185 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
1186         CAQueueingThreadStop(&gReceiveThread);
1187 #endif /* SINGLE_HANDLE */
1188     }
1189
1190     // destroy thread pool
1191     if (NULL != g_threadPoolHandle)
1192     {
1193         ca_thread_pool_free(g_threadPoolHandle);
1194         g_threadPoolHandle = NULL;
1195     }
1196
1197 #ifdef WITH_BWT
1198     CATerminateBlockWiseTransfer();
1199 #endif
1200     CARetransmissionDestroy(&g_retransmissionContext);
1201     CAQueueingThreadDestroy(&g_sendThread);
1202     CAQueueingThreadDestroy(&g_receiveThread);
1203
1204     // terminate interface adapters by controller
1205     CATerminateAdapters();
1206 #else
1207     // terminate interface adapters by controller
1208     CATerminateAdapters();
1209
1210     // stop retransmission
1211     CARetransmissionStop(&g_retransmissionContext);
1212     CARetransmissionDestroy(&g_retransmissionContext);
1213 #endif
1214
1215     OIC_LOG(DEBUG, TAG, "OUT");
1216 }
1217
1218 void CALogPDUInfo(coap_pdu_t *pdu, const CAEndpoint_t *endpoint)
1219 {
1220     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
1221
1222     OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
1223
1224 #ifdef TCP_ADAPTER
1225     if (CA_ADAPTER_TCP == endpoint->adapter)
1226     {
1227         OIC_LOG(DEBUG, TAG, "pdu header data :");
1228         OIC_LOG_BUFFER(DEBUG, TAG,  pdu->hdr, pdu->length);
1229     }
1230     else
1231 #endif
1232     {
1233         OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->coap_hdr_udp_t.type);
1234
1235         OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->coap_hdr_udp_t.code);
1236
1237         OIC_LOG(DEBUG, TAG, "PDU Maker - token :");
1238
1239         OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->coap_hdr_udp_t.token,
1240                        pdu->hdr->coap_hdr_udp_t.token_length);
1241     }
1242 }
1243
1244 static void CALogPayloadInfo(CAInfo_t *info)
1245 {
1246     if(info)
1247     {
1248         if (info->options)
1249         {
1250             for (uint32_t i = 0; i < info->numOptions; i++)
1251             {
1252                 OIC_LOG_V(DEBUG, TAG, "optionID: %d", info->options[i].optionID);
1253
1254                 OIC_LOG_V(DEBUG, TAG, "list: %s", info->options[i].optionData);
1255             }
1256         }
1257
1258         if (info->payload)
1259         {
1260             OIC_LOG_V(DEBUG, TAG, "payload: %p(%u)", info->payload,
1261                       info->payloadSize);
1262         }
1263
1264         if (info->token)
1265         {
1266             OIC_LOG(DEBUG, TAG, "token:");
1267             OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) info->token,
1268                            info->tokenLength);
1269         }
1270         OIC_LOG_V(DEBUG, TAG, "msgID: %d", info->messageId);
1271     }
1272     else
1273     {
1274         OIC_LOG(DEBUG, TAG, "info is NULL, cannot output log data");
1275     }
1276 }
1277
1278 void CAErrorHandler(const CAEndpoint_t *endpoint,
1279                     const void *data, uint32_t dataLen,
1280                     CAResult_t result)
1281 {
1282     OIC_LOG(DEBUG, TAG, "CAErrorHandler IN");
1283
1284 #ifndef SINGLE_THREAD
1285
1286     VERIFY_NON_NULL_VOID(endpoint, TAG, "remoteEndpoint");
1287     VERIFY_NON_NULL_VOID(data, TAG, "data");
1288
1289     uint32_t code = CA_NOT_FOUND;
1290     //Do not free remoteEndpoint and data. Currently they will be freed in data thread
1291     //Get PDU data
1292     coap_pdu_t *pdu = (coap_pdu_t *)CAParsePDU((const char *)data, dataLen, &code, endpoint);
1293     if (NULL == pdu)
1294     {
1295         OIC_LOG(ERROR, TAG, "Parse PDU failed");
1296         return;
1297     }
1298
1299     CAData_t *cadata = CAGenerateHandlerData(endpoint, NULL, pdu, CA_ERROR_DATA);
1300     if(!cadata)
1301     {
1302         OIC_LOG(ERROR, TAG, "CAErrorHandler, CAGenerateHandlerData failed!");
1303         coap_delete_pdu(pdu);
1304         return;
1305     }
1306
1307     cadata->errorInfo->result = result;
1308
1309     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1310     coap_delete_pdu(pdu);
1311 #endif
1312
1313     OIC_LOG(DEBUG, TAG, "CAErrorHandler OUT");
1314     return;
1315 }
1316
1317 static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info, CAResult_t result)
1318 {
1319     OIC_LOG(DEBUG, TAG, "CASendErrorInfo IN");
1320 #ifndef SINGLE_THREAD
1321     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
1322     if (!cadata)
1323     {
1324         OIC_LOG(ERROR, TAG, "memory allocation failed");
1325         return;
1326     }
1327
1328     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
1329     if (!ep)
1330     {
1331         OIC_LOG(ERROR, TAG, "endpoint clone failed");
1332         OICFree(cadata);
1333         return;
1334     }
1335
1336     CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
1337     if (!errorInfo)
1338     {
1339         OICFree(cadata);
1340         CAFreeEndpoint(ep);
1341         return;
1342     }
1343
1344     CAResult_t res = CACloneInfo(info, &errorInfo->info);
1345     if (CA_STATUS_OK != res)
1346     {
1347         OICFree(cadata);
1348         CAFreeEndpoint(ep);
1349         return;
1350     }
1351
1352     errorInfo->result = result;
1353     cadata->remoteEndpoint = ep;
1354     cadata->errorInfo = errorInfo;
1355     cadata->dataType = CA_ERROR_DATA;
1356
1357     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1358 #endif
1359     OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
1360 }