Merge branch 'master' into notification-service
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /* *****************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "caprotocolmessage.h"
30 #include "logger.h"
31 #include "config.h" /* for coap protocol */
32 #include "oic_malloc.h"
33 #include "canetworkconfigurator.h"
34 #include "caadapterutils.h"
35 #include "cainterfacecontroller.h"
36 #include "caretransmission.h"
37 #include "oic_string.h"
38
39 #ifdef WITH_BWT
40 #include "cablockwisetransfer.h"
41 #endif
42
43 #ifndef  SINGLE_THREAD
44 #include "uqueue.h"
45 #include "cathreadpool.h" /* for thread pool */
46 #include "caqueueingthread.h"
47
48 #define SINGLE_HANDLE
49 #define MAX_THREAD_POOL_SIZE    20
50
51 // thread pool handle
52 static ca_thread_pool_t g_threadPoolHandle = NULL;
53
54 // message handler main thread
55 static CAQueueingThread_t g_sendThread;
56 static CAQueueingThread_t g_receiveThread;
57
58 #else
59 #define CA_MAX_RT_ARRAY_SIZE    3
60 #endif  // SINGLE_THREAD
61
62 #define TAG "OIC_CA_MSG_HANDLE"
63
64 static CARetransmission_t g_retransmissionContext;
65
66 // handler field
67 static CARequestCallback g_requestHandler = NULL;
68 static CAResponseCallback g_responseHandler = NULL;
69 static CAErrorCallback g_errorHandler = NULL;
70 static CANetworkMonitorCallback g_nwMonitorHandler = NULL;
71
72 static void CAErrorHandler(const CAEndpoint_t *endpoint,
73                            const void *data, uint32_t dataLen,
74                            CAResult_t result);
75
76 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint,
77                                        const CARemoteId_t *identity,
78                                        const void *data, CADataType_t dataType);
79
80 static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info,
81                             CAResult_t result);
82
83 #ifdef SINGLE_THREAD
84 static void CAProcessReceivedData(CAData_t *data);
85 #endif
86 static void CADestroyData(void *data, uint32_t size);
87 static void CALogPayloadInfo(CAInfo_t *info);
88 static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *endpoint, uint16_t id,
89                                 CAToken_t token, uint8_t tokenLength);
90
91 #ifdef WITH_BWT
92 void CAAddDataToSendThread(CAData_t *data)
93 {
94     VERIFY_NON_NULL_VOID(data, TAG, "data");
95
96     // add thread
97     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
98 }
99
100 void CAAddDataToReceiveThread(CAData_t *data)
101 {
102     VERIFY_NON_NULL_VOID(data, TAG, "data");
103
104     // add thread
105     CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
106 }
107 #endif
108
109 static bool CAIsSelectedNetworkAvailable()
110 {
111     u_arraylist_t *list = CAGetSelectedNetworkList();
112     if (!list || u_arraylist_length(list) == 0)
113     {
114         OIC_LOG(ERROR, TAG, "No selected network");
115         return false;
116     }
117
118     return true;
119 }
120
121 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint,
122                                        const CARemoteId_t *identity,
123                                        const void *data, CADataType_t dataType)
124 {
125     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData IN");
126     CAInfo_t *info = NULL;
127     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
128     if (!cadata)
129     {
130         OIC_LOG(ERROR, TAG, "memory allocation failed");
131         return NULL;
132     }
133
134     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
135     if (!ep)
136     {
137         OIC_LOG(ERROR, TAG, "endpoint clone failed");
138         goto exit;
139     }
140
141     OIC_LOG_V(DEBUG, TAG, "address : %s", ep->addr);
142
143     if (CA_RESPONSE_DATA == dataType)
144     {
145         CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
146         if (!resInfo)
147         {
148             OIC_LOG(ERROR, TAG, "memory allocation failed");
149             goto exit;
150         }
151
152         CAResult_t result = CAGetResponseInfoFromPDU(data, resInfo, endpoint);
153         if (CA_STATUS_OK != result)
154         {
155             OIC_LOG(ERROR, TAG, "CAGetResponseInfoFromPDU Failed");
156             CADestroyResponseInfoInternal(resInfo);
157             goto exit;
158         }
159         cadata->responseInfo = resInfo;
160         info = &resInfo->info;
161         if (identity)
162         {
163             info->identity = *identity;
164         }
165         OIC_LOG(DEBUG, TAG, "Response Info :");
166         CALogPayloadInfo(info);
167     }
168     else if (CA_REQUEST_DATA == dataType)
169     {
170         CARequestInfo_t* reqInfo = (CARequestInfo_t*)OICCalloc(1, sizeof(CARequestInfo_t));
171         if (!reqInfo)
172         {
173             OIC_LOG(ERROR, TAG, "memory allocation failed");
174             goto exit;
175         }
176
177         CAResult_t result = CAGetRequestInfoFromPDU(data, endpoint, reqInfo);
178         if (CA_STATUS_OK != result)
179         {
180             OIC_LOG(ERROR, TAG, "CAGetRequestInfoFromPDU failed");
181             CADestroyRequestInfoInternal(reqInfo);
182             goto exit;
183         }
184
185         if (CADropSecondMessage(&caglobals.ca.requestHistory, endpoint, reqInfo->info.messageId,
186                                 reqInfo->info.token, reqInfo->info.tokenLength))
187         {
188             OIC_LOG(ERROR, TAG, "Second Request with same Token, Drop it");
189             CADestroyRequestInfoInternal(reqInfo);
190             goto exit;
191         }
192
193         cadata->requestInfo = reqInfo;
194         info = &reqInfo->info;
195         if (identity)
196         {
197             info->identity = *identity;
198         }
199         OIC_LOG(DEBUG, TAG, "Request Info :");
200         CALogPayloadInfo(info);
201    }
202     else if (CA_ERROR_DATA == dataType)
203     {
204         CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
205         if (!errorInfo)
206         {
207             OIC_LOG(ERROR, TAG, "Memory allocation failed!");
208             goto exit;
209         }
210
211         CAResult_t result = CAGetErrorInfoFromPDU(data, endpoint, errorInfo);
212         if (CA_STATUS_OK != result)
213         {
214             OIC_LOG(ERROR, TAG, "CAGetErrorInfoFromPDU failed");
215             OICFree(errorInfo);
216             goto exit;
217         }
218
219         cadata->errorInfo = errorInfo;
220         info = &errorInfo->info;
221         if (identity)
222         {
223             info->identity = *identity;
224         }
225         OIC_LOG(DEBUG, TAG, "error Info :");
226         CALogPayloadInfo(info);
227     }
228
229     cadata->remoteEndpoint = ep;
230     cadata->dataType = dataType;
231
232     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData OUT");
233     return cadata;
234
235 exit:
236     OICFree(cadata);
237     CAFreeEndpoint(ep);
238     return NULL;
239 }
240
241 static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uint32_t size)
242 {
243     VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint");
244     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
245
246     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
247     if (!ep)
248     {
249         OIC_LOG(ERROR, TAG, "clone failed");
250         return;
251     }
252
253     CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
254
255     if (!resInfo)
256     {
257         OIC_LOG(ERROR, TAG, "calloc failed");
258         CAFreeEndpoint(ep);
259         return;
260     }
261
262     resInfo->result = CA_RETRANSMIT_TIMEOUT;
263     resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size);
264     resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
265
266     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *) pdu, &(resInfo->info),
267                                        endpoint);
268     if (CA_STATUS_OK != res)
269     {
270         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
271         CADestroyResponseInfoInternal(resInfo);
272         CAFreeEndpoint(ep);
273         return;
274     }
275
276     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
277     if (NULL == cadata)
278     {
279         OIC_LOG(ERROR, TAG, "memory allocation failed !");
280         CAFreeEndpoint(ep);
281         CADestroyResponseInfoInternal(resInfo);
282         return;
283     }
284
285     cadata->type = SEND_TYPE_UNICAST;
286     cadata->remoteEndpoint = ep;
287     cadata->requestInfo = NULL;
288     cadata->responseInfo = resInfo;
289
290 #ifdef WITH_BWT
291     if (CAIsSupportedBlockwiseTransfer(endpoint->adapter))
292     {
293         res = CARemoveBlockDataFromListWithSeed(resInfo->info.token, resInfo->info.tokenLength,
294                                                 endpoint->port);
295         if (CA_STATUS_OK != res)
296         {
297             OIC_LOG(ERROR, TAG, "CARemoveBlockDataFromListWithSeed failed");
298         }
299     }
300 #endif // WITH_BWT
301
302 #ifdef SINGLE_THREAD
303     CAProcessReceivedData(cadata);
304 #else
305     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
306 #endif
307 }
308
309 static void CADestroyData(void *data, uint32_t size)
310 {
311     OIC_LOG(DEBUG, TAG, "CADestroyData IN");
312     if ((size_t)size < sizeof(CAData_t))
313     {
314         OIC_LOG_V(ERROR, TAG, "Destroy data too small %p %d", data, size);
315     }
316     CAData_t *cadata = (CAData_t *) data;
317
318     if (NULL == cadata)
319     {
320         OIC_LOG(ERROR, TAG, "cadata is NULL");
321         return;
322     }
323
324     if (NULL != cadata->remoteEndpoint)
325     {
326         CAFreeEndpoint(cadata->remoteEndpoint);
327     }
328
329     if (NULL != cadata->requestInfo)
330     {
331         CADestroyRequestInfoInternal((CARequestInfo_t *) cadata->requestInfo);
332     }
333
334     if (NULL != cadata->responseInfo)
335     {
336         CADestroyResponseInfoInternal((CAResponseInfo_t *) cadata->responseInfo);
337     }
338
339     if (NULL != cadata->errorInfo)
340     {
341         CADestroyErrorInfoInternal(cadata->errorInfo);
342     }
343
344     OICFree(cadata);
345     OIC_LOG(DEBUG, TAG, "CADestroyData OUT");
346 }
347
348 #ifdef SINGLE_THREAD
349 static void CAProcessReceivedData(CAData_t *data)
350 {
351     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData IN");
352     if (!data)
353     {
354         OIC_LOG(ERROR, TAG, "thread data error!!");
355         return;
356     }
357
358     // parse the data and call the callbacks.
359     // #1 parse the data
360     // #2 get endpoint
361     CAEndpoint_t *rep = (CAEndpoint_t *)(data->remoteEndpoint);
362     if (!rep)
363     {
364         OIC_LOG(ERROR, TAG, "remoteEndpoint error!!");
365         return;
366     }
367
368     if (data->requestInfo && g_requestHandler)
369     {
370         g_requestHandler(rep, data->requestInfo);
371     }
372     else if (data->responseInfo && g_responseHandler)
373     {
374         g_responseHandler(rep, data->responseInfo);
375     }
376     else if (data->errorInfo && g_errorHandler)
377     {
378         g_errorHandler(rep, data->errorInfo);
379     }
380
381     CADestroyData(data, sizeof(CAData_t));
382
383     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData OUT");
384 }
385 #endif
386
387 #ifndef SINGLE_THREAD
388 static void CAReceiveThreadProcess(void *threadData)
389 {
390 #ifndef SINGLE_HANDLE
391     CAData_t *data = (CAData_t *) threadData;
392     CAProcessReceivedData(data);
393 #else
394     (void)threadData;
395 #endif
396 }
397 #endif // SINGLE_THREAD
398
399 static CAResult_t CAProcessMulticastData(const CAData_t *data)
400 {
401     VERIFY_NON_NULL(data, TAG, "data");
402     VERIFY_NON_NULL(data->remoteEndpoint, TAG, "remoteEndpoint");
403
404     coap_pdu_t *pdu = NULL;
405     CAInfo_t *info = NULL;
406     coap_list_t *options = NULL;
407     coap_transport_type transport = coap_udp;
408     CAResult_t res = CA_SEND_FAILED;
409     if (NULL != data->requestInfo)
410     {
411         OIC_LOG(DEBUG, TAG, "requestInfo is available..");
412
413         info = &data->requestInfo->info;
414         pdu = CAGeneratePDU(CA_GET, info, data->remoteEndpoint, &options, &transport);
415
416         if (NULL != pdu)
417         {
418 #ifdef WITH_BWT
419             if (CAIsSupportedBlockwiseTransfer(data->remoteEndpoint->adapter))
420             {
421                 // Blockwise transfer
422                 res = CAAddBlockOption(&pdu, info, data->remoteEndpoint, &options);
423                 if (CA_STATUS_OK != res)
424                 {
425                     OIC_LOG(DEBUG, TAG, "CAAddBlockOption has failed");
426                     goto exit;
427                 }
428             }
429 #endif // WITH_BWT
430         }
431         else
432         {
433             OIC_LOG(ERROR,TAG,"Failed to generate multicast PDU");
434             CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
435             return res;
436         }
437     }
438     else
439     {
440         OIC_LOG(ERROR, TAG, "not supported message type for multicast.");
441         return res;
442     }
443
444     CALogPDUInfo(pdu, data->remoteEndpoint);
445
446     OIC_LOG(DEBUG, TAG, "pdu to send :");
447     OIC_LOG_BUFFER(DEBUG, TAG,  (uint8_t*)pdu->hdr, pdu->length);
448
449     res = CASendMulticastData(data->remoteEndpoint, pdu->hdr, pdu->length);
450     if (CA_STATUS_OK != res)
451     {
452         OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
453         goto exit;
454     }
455
456     coap_delete_list(options);
457     coap_delete_pdu(pdu);
458     return res;
459
460 exit:
461     CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
462     coap_delete_list(options);
463     coap_delete_pdu(pdu);
464     return res;
465 }
466
467 static CAResult_t CAProcessSendData(const CAData_t *data)
468 {
469     VERIFY_NON_NULL(data, TAG, "data");
470     VERIFY_NON_NULL(data->remoteEndpoint, TAG, "remoteEndpoint");
471
472     CAResult_t res = CA_STATUS_FAILED;
473
474     CASendDataType_t type = data->type;
475
476     coap_pdu_t *pdu = NULL;
477     CAInfo_t *info = NULL;
478     coap_list_t *options = NULL;
479     coap_transport_type transport = coap_udp;
480
481     if (SEND_TYPE_UNICAST == type)
482     {
483         OIC_LOG(DEBUG,TAG,"Unicast message");
484
485 #ifdef ROUTING_GATEWAY
486         /*
487          * When forwarding a packet, do not attempt retransmission as its the responsibility of
488          * packet originator node
489          */
490         bool skipRetransmission = false;
491 #endif
492
493         if (NULL != data->requestInfo)
494         {
495             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
496
497             info = &data->requestInfo->info;
498 #ifdef ROUTING_GATEWAY
499             skipRetransmission = data->requestInfo->info.skipRetransmission;
500 #endif
501             pdu = CAGeneratePDU(data->requestInfo->method, info, data->remoteEndpoint,
502                                 &options, &transport);
503         }
504         else if (NULL != data->responseInfo)
505         {
506             OIC_LOG(DEBUG, TAG, "responseInfo is available..");
507
508             info = &data->responseInfo->info;
509 #ifdef ROUTING_GATEWAY
510             skipRetransmission = data->responseInfo->info.skipRetransmission;
511 #endif
512             pdu = CAGeneratePDU(data->responseInfo->result, info, data->remoteEndpoint,
513                                 &options, &transport);
514         }
515         else
516         {
517             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
518             return CA_STATUS_INVALID_PARAM;
519         }
520
521         // interface controller function call.
522         if (NULL != pdu)
523         {
524 #ifdef WITH_BWT
525             if (CAIsSupportedBlockwiseTransfer(data->remoteEndpoint->adapter))
526             {
527                 // Blockwise transfer
528                 if (NULL != info)
529                 {
530                     CAResult_t res = CAAddBlockOption(&pdu, info,
531                                                       data->remoteEndpoint,
532                                                       &options);
533                     if (CA_STATUS_OK != res)
534                     {
535                         OIC_LOG(INFO, TAG, "to write block option has failed");
536                         CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
537                         coap_delete_list(options);
538                         coap_delete_pdu(pdu);
539                         return res;
540                     }
541                 }
542             }
543 #endif // WITH_BWT
544             CALogPDUInfo(pdu, data->remoteEndpoint);
545
546             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
547             if (CA_STATUS_OK != res)
548             {
549                 OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
550                 CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
551                 coap_delete_list(options);
552                 coap_delete_pdu(pdu);
553                 return res;
554             }
555
556 #ifdef WITH_TCP
557             if (CAIsSupportedCoAPOverTCP(data->remoteEndpoint->adapter))
558             {
559                 OIC_LOG(INFO, TAG, "retransmission will be not worked");
560             }
561             else
562 #endif
563 #ifdef ROUTING_GATEWAY
564             if(!skipRetransmission)
565 #endif
566             {
567                 // for retransmission
568                 res = CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint,
569                                                pdu->hdr, pdu->length);
570                 if ((CA_STATUS_OK != res) && (CA_NOT_SUPPORTED != res))
571                 {
572                     //when retransmission not supported this will return CA_NOT_SUPPORTED, ignore
573                     OIC_LOG_V(INFO, TAG, "retransmission is not enabled due to error, res : %d", res);
574                     coap_delete_list(options);
575                     coap_delete_pdu(pdu);
576                     return res;
577                 }
578             }
579
580             coap_delete_list(options);
581             coap_delete_pdu(pdu);
582         }
583         else
584         {
585             OIC_LOG(ERROR,TAG,"Failed to generate unicast PDU");
586             CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
587             return CA_SEND_FAILED;
588         }
589     }
590     else if (SEND_TYPE_MULTICAST == type)
591     {
592         OIC_LOG(DEBUG,TAG,"Multicast message");
593 #ifdef WITH_TCP
594         /*
595          * If CoAP over TCP is enabled, the CoAP pdu wont be same for IP and other adapters.
596          * That's why we need to generate two pdu's, one for IP and second for other transports.
597          * Two possible cases we might have to split: a) when adapter is CA_DEFAULT_ADAPTER
598          * b) when one of the adapter is IP adapter(ex: CA_ADAPTER_IP | CA_ADAPTER_GATT_BTLE)
599          */
600         if (data->remoteEndpoint->adapter == CA_DEFAULT_ADAPTER ||
601                 (CA_ADAPTER_IP & data->remoteEndpoint->adapter &&
602                     CA_ADAPTER_IP != data->remoteEndpoint->adapter))
603         {
604             if (data->remoteEndpoint->adapter == CA_DEFAULT_ADAPTER)
605             {
606                 data->remoteEndpoint->adapter = CA_ALL_ADAPTERS ^ CA_ADAPTER_IP;
607             }
608             else
609             {
610                 data->remoteEndpoint->adapter = data->remoteEndpoint->adapter ^ CA_ADAPTER_IP;
611             }
612             CAProcessMulticastData(data);
613             data->remoteEndpoint->adapter = CA_ADAPTER_IP;
614             CAProcessMulticastData(data);
615         }
616         else
617         {
618             CAProcessMulticastData(data);
619         }
620 #else
621         CAProcessMulticastData(data);
622 #endif
623     }
624
625     return CA_STATUS_OK;
626 }
627
628 #ifndef SINGLE_THREAD
629 static void CASendThreadProcess(void *threadData)
630 {
631     CAData_t *data = (CAData_t *) threadData;
632     CAProcessSendData(data);
633 }
634 #endif
635
636 /*
637  * If a second message arrives with the same message ID, token and the other address
638  * family, drop it.  Typically, IPv6 beats IPv4, so the IPv4 message is dropped.
639  */
640 static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *ep, uint16_t id,
641                                 CAToken_t token, uint8_t tokenLength)
642 {
643     if (!ep)
644     {
645         return true;
646     }
647     if (ep->adapter != CA_ADAPTER_IP)
648     {
649         return false;
650     }
651     if (!caglobals.ip.dualstack)
652     {
653         return false;
654     }
655
656     if (tokenLength > CA_MAX_TOKEN_LEN)
657     {
658         /*
659          * If token length is more than CA_MAX_TOKEN_LEN,
660          * we compare the first CA_MAX_TOKEN_LEN bytes only.
661          */
662         tokenLength = CA_MAX_TOKEN_LEN;
663     }
664
665     bool ret = false;
666     CATransportFlags_t familyFlags = ep->flags & CA_IPFAMILY_MASK;
667
668     for (size_t i = 0; i < sizeof(history->items) / sizeof(history->items[0]); i++)
669     {
670         CAHistoryItem_t *item = &(history->items[i]);
671         if (id == item->messageId && tokenLength == item->tokenLength
672             && memcmp(item->token, token, tokenLength) == 0)
673         {
674             if ((familyFlags ^ item->flags) == CA_IPFAMILY_MASK)
675             {
676                 OIC_LOG_V(INFO, TAG, "IPv%c duplicate message ignored",
677                           familyFlags & CA_IPV6 ? '6' : '4');
678                 ret = true;
679                 break;
680             }
681         }
682     }
683
684     history->items[history->nextIndex].flags = familyFlags;
685     history->items[history->nextIndex].messageId = id;
686     if (token && tokenLength)
687     {
688         memcpy(history->items[history->nextIndex].token, token, tokenLength);
689         history->items[history->nextIndex].tokenLength = tokenLength;
690     }
691
692     if (++history->nextIndex >= HISTORYSIZE)
693     {
694         history->nextIndex = 0;
695     }
696
697     return ret;
698 }
699
700 static void CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
701                                      const void *data, uint32_t dataLen)
702 {
703     VERIFY_NON_NULL_VOID(sep, TAG, "remoteEndpoint");
704     VERIFY_NON_NULL_VOID(data, TAG, "data");
705
706     OIC_LOG(DEBUG, TAG, "received pdu data :");
707     OIC_LOG_BUFFER(DEBUG, TAG,  data, dataLen);
708
709     uint32_t code = CA_NOT_FOUND;
710     CAData_t *cadata = NULL;
711
712     coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code,
713                                                 &(sep->endpoint));
714     if (NULL == pdu)
715     {
716         OIC_LOG(ERROR, TAG, "Parse PDU failed");
717         return;
718     }
719
720     OIC_LOG_V(DEBUG, TAG, "code = %d", code);
721     if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code)
722     {
723         cadata = CAGenerateHandlerData(&(sep->endpoint), &(sep->identity), pdu, CA_REQUEST_DATA);
724         if (!cadata)
725         {
726             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
727             coap_delete_pdu(pdu);
728             return;
729         }
730     }
731     else
732     {
733         cadata = CAGenerateHandlerData(&(sep->endpoint), &(sep->identity), pdu, CA_RESPONSE_DATA);
734         if (!cadata)
735         {
736             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
737             coap_delete_pdu(pdu);
738             return;
739         }
740
741 #ifdef WITH_TCP
742         if (CAIsSupportedCoAPOverTCP(sep->endpoint.adapter))
743         {
744             OIC_LOG(INFO, TAG, "retransmission is not supported");
745         }
746         else
747 #endif
748         {
749             // for retransmission
750             void *retransmissionPdu = NULL;
751             CARetransmissionReceivedData(&g_retransmissionContext, cadata->remoteEndpoint, pdu->hdr,
752                                          pdu->length, &retransmissionPdu);
753
754             // get token from saved data in retransmission list
755             if (retransmissionPdu && CA_EMPTY == code)
756             {
757                 if (cadata->responseInfo)
758                 {
759                     CAInfo_t *info = &cadata->responseInfo->info;
760                     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu,
761                                                        info, &(sep->endpoint));
762                     if (CA_STATUS_OK != res)
763                     {
764                         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
765                         OICFree(info->token);
766                         info->tokenLength = 0;
767                     }
768                 }
769             }
770             OICFree(retransmissionPdu);
771         }
772     }
773
774     cadata->type = SEND_TYPE_UNICAST;
775
776 #ifdef SINGLE_THREAD
777     CAProcessReceivedData(cadata);
778 #else
779 #ifdef WITH_BWT
780     if (CAIsSupportedBlockwiseTransfer(sep->endpoint.adapter))
781     {
782         CAResult_t res = CAReceiveBlockWiseData(pdu, &(sep->endpoint), cadata, dataLen);
783         if (CA_NOT_SUPPORTED == res || CA_REQUEST_TIMEOUT == res)
784         {
785             OIC_LOG(DEBUG, TAG, "this message does not have block option");
786             CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
787         }
788         else
789         {
790             CADestroyData(cadata, sizeof(CAData_t));
791         }
792     }
793     else
794 #endif
795     {
796         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
797     }
798 #endif // SINGLE_THREAD
799
800     coap_delete_pdu(pdu);
801 }
802
803 void CAHandleRequestResponseCallbacks()
804 {
805 #ifdef SINGLE_THREAD
806     CAReadData();
807     CARetransmissionBaseRoutine((void *)&g_retransmissionContext);
808 #else
809 #ifdef SINGLE_HANDLE
810     // parse the data and call the callbacks.
811     // #1 parse the data
812     // #2 get endpoint
813
814     ca_mutex_lock(g_receiveThread.threadMutex);
815
816     u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
817
818     ca_mutex_unlock(g_receiveThread.threadMutex);
819
820     if (NULL == item || NULL == item->msg)
821     {
822         return;
823     }
824
825     // get endpoint
826     CAData_t *td = (CAData_t *) item->msg;
827
828     if (td->requestInfo && g_requestHandler)
829     {
830         OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
831         g_requestHandler(td->remoteEndpoint, td->requestInfo);
832     }
833     else if (td->responseInfo && g_responseHandler)
834     {
835         OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
836         g_responseHandler(td->remoteEndpoint, td->responseInfo);
837     }
838     else if (td->errorInfo && g_errorHandler)
839     {
840         OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
841         g_errorHandler(td->remoteEndpoint, td->errorInfo);
842     }
843
844     CADestroyData(item->msg, sizeof(CAData_t));
845     OICFree(item);
846
847 #endif // SINGLE_HANDLE
848 #endif // SINGLE_THREAD
849 }
850
851 static CAData_t* CAPrepareSendData(const CAEndpoint_t *endpoint, const void *sendData,
852                                    CADataType_t dataType)
853 {
854     OIC_LOG(DEBUG, TAG, "CAPrepareSendData IN");
855
856     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
857     if (!cadata)
858     {
859         OIC_LOG(ERROR, TAG, "memory allocation failed");
860         return NULL;
861     }
862
863     if (CA_REQUEST_DATA == dataType)
864     {
865         // clone request info
866         CARequestInfo_t *request = CACloneRequestInfo((CARequestInfo_t *)sendData);
867
868         if (!request)
869         {
870             OIC_LOG(ERROR, TAG, "CACloneRequestInfo failed");
871             goto exit;
872         }
873
874         cadata->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST;
875         cadata->requestInfo =  request;
876     }
877     else if (CA_RESPONSE_DATA == dataType)
878     {
879         // clone response info
880         CAResponseInfo_t *response = CACloneResponseInfo((CAResponseInfo_t *)sendData);
881
882         if(!response)
883         {
884             OIC_LOG(ERROR, TAG, "CACloneResponseInfo failed");
885             goto exit;
886         }
887
888         cadata->type = response->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST;
889         cadata->responseInfo = response;
890     }
891     else
892     {
893         OIC_LOG(ERROR, TAG, "CAPrepareSendData unknown data type");
894         goto exit;
895     }
896
897     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
898     if (!ep)
899     {
900         OIC_LOG(ERROR, TAG, "endpoint clone failed");
901         goto exit;
902     }
903
904     cadata->remoteEndpoint = ep;
905     cadata->dataType = dataType;
906     return cadata;
907
908 exit:
909     CADestroyData(cadata, sizeof(CAData_t));
910     return NULL;
911 }
912
913 CAResult_t CADetachSendMessage(const CAEndpoint_t *endpoint, const void *sendMsg,
914                                CADataType_t dataType)
915 {
916     VERIFY_NON_NULL(endpoint, TAG, "endpoint");
917     VERIFY_NON_NULL(sendMsg, TAG, "sendMsg");
918
919     if (false == CAIsSelectedNetworkAvailable())
920     {
921         return CA_STATUS_FAILED;
922     }
923
924 #ifdef ARDUINO
925     // If max retransmission queue is reached, then don't handle new request
926     if (CA_MAX_RT_ARRAY_SIZE == u_arraylist_length(g_retransmissionContext.dataList))
927     {
928         OIC_LOG(ERROR, TAG, "max RT queue size reached!");
929         return CA_SEND_FAILED;
930     }
931 #endif // ARDUINO
932
933     CAData_t *data = CAPrepareSendData(endpoint, sendMsg, dataType);
934     if(!data)
935     {
936         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
937         return CA_MEMORY_ALLOC_FAILED;
938     }
939
940 #ifdef SINGLE_THREAD
941     CAResult_t result = CAProcessSendData(data);
942     if (CA_STATUS_OK != result)
943     {
944         OIC_LOG(ERROR, TAG, "CAProcessSendData failed");
945         CADestroyData(data, sizeof(CAData_t));
946         return result;
947     }
948
949     CADestroyData(data, sizeof(CAData_t));
950 #else
951 #ifdef WITH_BWT
952     if (CAIsSupportedBlockwiseTransfer(endpoint->adapter))
953     {
954         // send block data
955         CAResult_t res = CASendBlockWiseData(data);
956         if (CA_NOT_SUPPORTED == res)
957         {
958             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
959             CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
960             return CA_STATUS_OK;
961         }
962         else
963         {
964             CADestroyData(data, sizeof(CAData_t));
965         }
966         return res;
967     }
968     else
969 #endif // WITH_BWT
970     {
971         CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
972     }
973 #endif // SINGLE_THREAD
974
975     return CA_STATUS_OK;
976 }
977
978 void CASetInterfaceCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler,
979                              CAErrorCallback errorHandler)
980 {
981     g_requestHandler = ReqHandler;
982     g_responseHandler = RespHandler;
983     g_errorHandler = errorHandler;
984 }
985
986 void CASetNetworkMonitorCallback(CANetworkMonitorCallback nwMonitorHandler)
987 {
988     g_nwMonitorHandler = nwMonitorHandler;
989 }
990
991 CAResult_t CAInitializeMessageHandler()
992 {
993     CASetPacketReceivedCallback(CAReceivedPacketCallback);
994     CASetErrorHandleCallback(CAErrorHandler);
995
996 #ifndef SINGLE_THREAD
997     // create thread pool
998     CAResult_t res = ca_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle);
999     if (CA_STATUS_OK != res)
1000     {
1001         OIC_LOG(ERROR, TAG, "thread pool initialize error.");
1002         return res;
1003     }
1004
1005     // send thread initialize
1006     res = CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle,
1007                                      CASendThreadProcess, CADestroyData);
1008     if (CA_STATUS_OK != res)
1009     {
1010         OIC_LOG(ERROR, TAG, "Failed to Initialize send queue thread");
1011         ca_thread_pool_free(g_threadPoolHandle);
1012         g_threadPoolHandle = NULL;
1013         return res;
1014     }
1015
1016     // start send thread
1017     res = CAQueueingThreadStart(&g_sendThread);
1018     if (CA_STATUS_OK != res)
1019     {
1020         OIC_LOG(ERROR, TAG, "thread start error(send thread).");
1021         ca_thread_pool_free(g_threadPoolHandle);
1022         g_threadPoolHandle = NULL;
1023         CAQueueingThreadDestroy(&g_sendThread);
1024         return res;
1025     }
1026
1027     // receive thread initialize
1028     res = CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle,
1029                                      CAReceiveThreadProcess, CADestroyData);
1030     if (CA_STATUS_OK != res)
1031     {
1032         OIC_LOG(ERROR, TAG, "Failed to Initialize receive queue thread");
1033         ca_thread_pool_free(g_threadPoolHandle);
1034         g_threadPoolHandle = NULL;
1035         CAQueueingThreadDestroy(&g_sendThread);
1036         return res;
1037     }
1038
1039 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
1040     // start receive thread
1041     res = CAQueueingThreadStart(&g_receiveThread);
1042     if (CA_STATUS_OK != res)
1043     {
1044         OIC_LOG(ERROR, TAG, "thread start error(receive thread).");
1045         ca_thread_pool_free(g_threadPoolHandle);
1046         g_threadPoolHandle = NULL;
1047         CAQueueingThreadDestroy(&g_sendThread);
1048         CAQueueingThreadDestroy(&g_receiveThread);
1049         return res;
1050     }
1051 #endif // SINGLE_HANDLE
1052
1053     // retransmission initialize
1054     res = CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle,
1055                                      CASendUnicastData, CATimeoutCallback, NULL);
1056     if (CA_STATUS_OK != res)
1057     {
1058         OIC_LOG(ERROR, TAG, "Failed to Initialize Retransmission.");
1059         ca_thread_pool_free(g_threadPoolHandle);
1060         g_threadPoolHandle = NULL;
1061         CAQueueingThreadDestroy(&g_sendThread);
1062         CAQueueingThreadDestroy(&g_receiveThread);
1063         return res;
1064     }
1065
1066 #ifdef WITH_BWT
1067     // block-wise transfer initialize
1068     res = CAInitializeBlockWiseTransfer(CAAddDataToSendThread, CAAddDataToReceiveThread);
1069     if (CA_STATUS_OK != res)
1070     {
1071         OIC_LOG(ERROR, TAG, "Failed to Initialize BlockWiseTransfer.");
1072         ca_thread_pool_free(g_threadPoolHandle);
1073         g_threadPoolHandle = NULL;
1074         CAQueueingThreadDestroy(&g_sendThread);
1075         CAQueueingThreadDestroy(&g_receiveThread);
1076         CARetransmissionDestroy(&g_retransmissionContext);
1077         return res;
1078     }
1079 #endif
1080
1081     // start retransmission
1082     res = CARetransmissionStart(&g_retransmissionContext);
1083     if (CA_STATUS_OK != res)
1084     {
1085         OIC_LOG(ERROR, TAG, "thread start error(retransmission thread).");
1086         ca_thread_pool_free(g_threadPoolHandle);
1087         g_threadPoolHandle = NULL;
1088         CAQueueingThreadDestroy(&g_sendThread);
1089         CAQueueingThreadDestroy(&g_receiveThread);
1090         CARetransmissionDestroy(&g_retransmissionContext);
1091         return res;
1092     }
1093
1094     // initialize interface adapters by controller
1095     CAInitializeAdapters(g_threadPoolHandle);
1096 #else
1097     // retransmission initialize
1098     CAResult_t res = CARetransmissionInitialize(&g_retransmissionContext, NULL, CASendUnicastData,
1099                                                 CATimeoutCallback, NULL);
1100     if (CA_STATUS_OK != res)
1101     {
1102         OIC_LOG(ERROR, TAG, "Failed to Initialize Retransmission.");
1103         return res;
1104     }
1105
1106     CAInitializeAdapters();
1107 #endif // SINGLE_THREAD
1108
1109     return CA_STATUS_OK;
1110 }
1111
1112 void CATerminateMessageHandler()
1113 {
1114 #ifndef SINGLE_THREAD
1115     CATransportAdapter_t connType;
1116     u_arraylist_t *list = CAGetSelectedNetworkList();
1117     uint32_t length = u_arraylist_length(list);
1118
1119     uint32_t i = 0;
1120     for (i = 0; i < length; i++)
1121     {
1122         void* ptrType = u_arraylist_get(list, i);
1123
1124         if (NULL == ptrType)
1125         {
1126             continue;
1127         }
1128
1129         connType = *(CATransportAdapter_t *)ptrType;
1130         CAStopAdapter(connType);
1131     }
1132
1133     // stop retransmission
1134     if (NULL != g_retransmissionContext.threadMutex)
1135     {
1136         CARetransmissionStop(&g_retransmissionContext);
1137     }
1138
1139     // stop thread
1140     // delete thread data
1141     if (NULL != g_sendThread.threadMutex)
1142     {
1143         CAQueueingThreadStop(&g_sendThread);
1144     }
1145
1146     // stop thread
1147     // delete thread data
1148     if (NULL != g_receiveThread.threadMutex)
1149     {
1150 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
1151         CAQueueingThreadStop(&g_receiveThread);
1152 #endif
1153     }
1154
1155     // destroy thread pool
1156     if (NULL != g_threadPoolHandle)
1157     {
1158         ca_thread_pool_free(g_threadPoolHandle);
1159         g_threadPoolHandle = NULL;
1160     }
1161
1162 #ifdef WITH_BWT
1163     CATerminateBlockWiseTransfer();
1164 #endif
1165     CARetransmissionDestroy(&g_retransmissionContext);
1166     CAQueueingThreadDestroy(&g_sendThread);
1167     CAQueueingThreadDestroy(&g_receiveThread);
1168
1169     // terminate interface adapters by controller
1170     CATerminateAdapters();
1171 #else
1172     // terminate interface adapters by controller
1173     CATerminateAdapters();
1174
1175     // stop retransmission
1176     CARetransmissionStop(&g_retransmissionContext);
1177     CARetransmissionDestroy(&g_retransmissionContext);
1178 #endif // SINGLE_THREAD
1179 }
1180
1181 void CALogPDUInfo(coap_pdu_t *pdu, const CAEndpoint_t *endpoint)
1182 {
1183     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
1184     VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint");
1185
1186     OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
1187
1188 #ifdef WITH_TCP
1189     if (CAIsSupportedCoAPOverTCP(endpoint->adapter))
1190     {
1191         OIC_LOG(DEBUG, TAG, "pdu header data :");
1192         OIC_LOG_BUFFER(DEBUG, TAG,  (const uint8_t *) pdu->hdr, pdu->length);
1193     }
1194     else
1195 #endif
1196     {
1197         OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->coap_hdr_udp_t.type);
1198
1199         OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->coap_hdr_udp_t.code);
1200
1201         OIC_LOG(DEBUG, TAG, "PDU Maker - token :");
1202
1203         OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->coap_hdr_udp_t.token,
1204                        pdu->hdr->coap_hdr_udp_t.token_length);
1205     }
1206 }
1207
1208 static void CALogPayloadInfo(CAInfo_t *info)
1209 {
1210     if (info)
1211     {
1212         if (info->options)
1213         {
1214             for (uint32_t i = 0; i < info->numOptions; i++)
1215             {
1216                 OIC_LOG_V(DEBUG, TAG, "optionID: %u", info->options[i].optionID);
1217
1218                 OIC_LOG_V(DEBUG, TAG, "list: %s", info->options[i].optionData);
1219             }
1220         }
1221
1222         if (info->payload)
1223         {
1224             OIC_LOG_V(DEBUG, TAG, "payload: %p(%zu)", info->payload,
1225                       info->payloadSize);
1226         }
1227
1228         if (info->token)
1229         {
1230             OIC_LOG(DEBUG, TAG, "token:");
1231             OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) info->token,
1232                            info->tokenLength);
1233         }
1234         OIC_LOG_V(DEBUG, TAG, "msgID: %u", info->messageId);
1235     }
1236     else
1237     {
1238         OIC_LOG(DEBUG, TAG, "info is NULL, cannot output log data");
1239     }
1240 }
1241
1242 void CAErrorHandler(const CAEndpoint_t *endpoint,
1243                     const void *data, uint32_t dataLen,
1244                     CAResult_t result)
1245 {
1246     OIC_LOG(DEBUG, TAG, "CAErrorHandler IN");
1247
1248 #ifndef SINGLE_THREAD
1249     VERIFY_NON_NULL_VOID(endpoint, TAG, "remoteEndpoint");
1250     VERIFY_NON_NULL_VOID(data, TAG, "data");
1251
1252     uint32_t code = CA_NOT_FOUND;
1253     //Do not free remoteEndpoint and data. Currently they will be freed in data thread
1254     //Get PDU data
1255     coap_pdu_t *pdu = (coap_pdu_t *)CAParsePDU((const char *)data, dataLen, &code, endpoint);
1256     if (NULL == pdu)
1257     {
1258         OIC_LOG(ERROR, TAG, "Parse PDU failed");
1259         return;
1260     }
1261
1262     CAData_t *cadata = CAGenerateHandlerData(endpoint, NULL, pdu, CA_ERROR_DATA);
1263     if (!cadata)
1264     {
1265         OIC_LOG(ERROR, TAG, "CAErrorHandler, CAGenerateHandlerData failed!");
1266         coap_delete_pdu(pdu);
1267         return;
1268     }
1269
1270     cadata->errorInfo->result = result;
1271
1272     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1273     coap_delete_pdu(pdu);
1274 #endif
1275
1276     OIC_LOG(DEBUG, TAG, "CAErrorHandler OUT");
1277     return;
1278 }
1279
1280 static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info, CAResult_t result)
1281 {
1282     OIC_LOG(DEBUG, TAG, "CASendErrorInfo IN");
1283 #ifndef SINGLE_THREAD
1284     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
1285     if (!cadata)
1286     {
1287         OIC_LOG(ERROR, TAG, "cadata memory allocation failed");
1288         return;
1289     }
1290
1291     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
1292     if (!ep)
1293     {
1294         OIC_LOG(ERROR, TAG, "endpoint clone failed");
1295         OICFree(cadata);
1296         return;
1297     }
1298
1299     CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
1300     if (!errorInfo)
1301     {
1302         OIC_LOG(ERROR, TAG, "errorInfo memory allocation failed");
1303         OICFree(cadata);
1304         CAFreeEndpoint(ep);
1305         return;
1306     }
1307
1308     CAResult_t res = CACloneInfo(info, &errorInfo->info);
1309     if (CA_STATUS_OK != res)
1310     {
1311         OIC_LOG(ERROR, TAG, "info clone failed");
1312         OICFree(cadata);
1313         OICFree(errorInfo);
1314         CAFreeEndpoint(ep);
1315         return;
1316     }
1317
1318     errorInfo->result = result;
1319     cadata->remoteEndpoint = ep;
1320     cadata->errorInfo = errorInfo;
1321     cadata->dataType = CA_ERROR_DATA;
1322
1323     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1324 #endif
1325     OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
1326 }