fix regression in Android interface handling introduced by IPv6 patch.
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /* *****************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "caprotocolmessage.h"
30 #include "logger.h"
31 #include "config.h" /* for coap protocol */
32 #include "oic_malloc.h"
33 #include "canetworkconfigurator.h"
34 #include "caadapterutils.h"
35 #include "cainterfacecontroller.h"
36 #include "caretransmission.h"
37
38 #ifdef WITH_BWT
39 #include "cablockwisetransfer.h"
40 #endif
41
42 #ifndef  SINGLE_THREAD
43 #include "uqueue.h"
44 #include "cathreadpool.h" /* for thread pool */
45 #include "caqueueingthread.h"
46
47 #define SINGLE_HANDLE
48 #define MAX_THREAD_POOL_SIZE    20
49
50 // thread pool handle
51 static ca_thread_pool_t g_threadPoolHandle = NULL;
52
53 // message handler main thread
54 static CAQueueingThread_t g_sendThread;
55 static CAQueueingThread_t g_receiveThread;
56
57 #else
58 #define CA_MAX_RT_ARRAY_SIZE    3
59 #endif  /* SINGLE_THREAD */
60
61 #define TAG "CA_MSG_HNDLR"
62
63 static CARetransmission_t g_retransmissionContext;
64
65 // handler field
66 static CARequestCallback g_requestHandler = NULL;
67 static CAResponseCallback g_responseHandler = NULL;
68 static CAErrorCallback g_errorHandler = NULL;
69
70 static void CAErrorHandler(const CAEndpoint_t *endpoint,
71                            const void *data, uint32_t dataLen,
72                            CAResult_t result);
73
74 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint,
75                                        const CARemoteId_t *identity,
76                                        const void *data, CADataType_t dataType);
77
78 static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info,
79                             CAResult_t result);
80
81 #ifdef SINGLE_THREAD
82 static void CAProcessReceivedData(CAData_t *data);
83 #endif
84 static void CADestroyData(void *data, uint32_t size);
85 static void CALogPayloadInfo(CAInfo_t *info);
86 static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *endpoint, uint16_t id);
87
88 #ifdef WITH_BWT
89 void CAAddDataToSendThread(CAData_t *data)
90 {
91     OIC_LOG(DEBUG, TAG, "IN");
92     VERIFY_NON_NULL_VOID(data, TAG, "data");
93
94     // add thread
95     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
96
97     OIC_LOG(DEBUG, TAG, "OUT");
98 }
99
100 void CAAddDataToReceiveThread(CAData_t *data)
101 {
102     OIC_LOG(DEBUG, TAG, "IN - CAAddDataToReceiveThread");
103     VERIFY_NON_NULL_VOID(data, TAG, "data");
104
105     // add thread
106     CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
107
108     OIC_LOG(DEBUG, TAG, "OUT - CAAddDataToReceiveThread");
109 }
110 #endif
111
112 static bool CAIsSelectedNetworkAvailable()
113 {
114     u_arraylist_t *list = CAGetSelectedNetworkList();
115     if (!list || u_arraylist_length(list) == 0)
116     {
117         OIC_LOG(ERROR, TAG, "No selected network");
118         return false;
119     }
120
121     return true;
122 }
123
124 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint,
125                                        const CARemoteId_t *identity,
126                                        const void *data, CADataType_t dataType)
127 {
128     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData IN");
129     CAInfo_t *info = NULL;
130     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
131     if (!cadata)
132     {
133         OIC_LOG(ERROR, TAG, "memory allocation failed");
134         return NULL;
135     }
136
137     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
138     if (!ep)
139     {
140         OIC_LOG(ERROR, TAG, "endpoint clone failed");
141         OICFree(cadata);
142         return NULL;
143     }
144
145     OIC_LOG_V(DEBUG, TAG, "address : %s", ep->addr);
146     CAResult_t result;
147
148     if(CA_RESPONSE_DATA == dataType)
149     {
150         CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
151         if (!resInfo)
152         {
153             OIC_LOG(ERROR, TAG, "memory allocation failed");
154             OICFree(cadata);
155             CAFreeEndpoint(ep);
156             return NULL;
157         }
158
159         result = CAGetResponseInfoFromPDU(data, resInfo);
160         if (CA_STATUS_OK != result)
161         {
162             OIC_LOG(ERROR, TAG, "CAGetResponseInfoFromPDU Failed");
163             CAFreeEndpoint(ep);
164             CADestroyResponseInfoInternal(resInfo);
165             OICFree(cadata);
166             return NULL;
167         }
168         cadata->responseInfo = resInfo;
169         info = &resInfo->info;
170         if (identity)
171         {
172             info->identity = *identity;
173         }
174         OIC_LOG(DEBUG, TAG, "Response Info :");
175         CALogPayloadInfo(info);
176     }
177     else if (CA_REQUEST_DATA == dataType)
178     {
179         CARequestInfo_t* reqInfo = (CARequestInfo_t*)OICCalloc(1, sizeof(CARequestInfo_t));
180         if (!reqInfo)
181         {
182             OIC_LOG(ERROR, TAG, "memory allocation failed");
183             OICFree(cadata);
184             CAFreeEndpoint(ep);
185             return NULL;
186         }
187
188         result = CAGetRequestInfoFromPDU(data, reqInfo);
189         if (CA_STATUS_OK != result)
190         {
191             OIC_LOG(ERROR, TAG, "CAGetRequestInfoFromPDU failed");
192             CAFreeEndpoint(ep);
193             CADestroyRequestInfoInternal(reqInfo);
194             OICFree(cadata);
195             return NULL;
196         }
197
198         if (CADropSecondMessage(&caglobals.ca.requestHistory, endpoint, reqInfo->info.messageId))
199         {
200             OIC_LOG(ERROR, TAG, "Second Request with same Token, Drop it");
201             CAFreeEndpoint(ep);
202             CADestroyRequestInfoInternal(reqInfo);
203             OICFree(cadata);
204             return NULL;
205         }
206
207         cadata->requestInfo = reqInfo;
208         info = &reqInfo->info;
209         if (identity)
210         {
211             info->identity = *identity;
212         }
213         OIC_LOG(DEBUG, TAG, "Request Info :");
214         CALogPayloadInfo(info);
215    }
216     else if (CA_ERROR_DATA == dataType)
217     {
218         CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
219         if (!errorInfo)
220         {
221             OIC_LOG(ERROR, TAG, "Memory allocation failed!");
222             OICFree(cadata);
223             CAFreeEndpoint(ep);
224             return NULL;
225         }
226
227         CAResult_t result = CAGetErrorInfoFromPDU(data, errorInfo);
228         if (CA_STATUS_OK != result)
229         {
230             OIC_LOG(ERROR, TAG, "CAGetErrorInfoFromPDU failed");
231             CAFreeEndpoint(ep);
232             OICFree(errorInfo);
233             OICFree(cadata);
234             return NULL;
235         }
236
237         cadata->errorInfo = errorInfo;
238         info = &errorInfo->info;
239         if (identity)
240         {
241             info->identity = *identity;
242         }
243         OIC_LOG(DEBUG, TAG, "error Info :");
244         CALogPayloadInfo(info);
245     }
246
247     cadata->remoteEndpoint = ep;
248     cadata->dataType = dataType;
249
250     return cadata;
251
252     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData OUT");
253 }
254
255 static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uint32_t size)
256 {
257     OIC_LOG(DEBUG, TAG, "IN");
258     VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint");
259     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
260
261     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
262     if (!ep)
263     {
264         OIC_LOG(ERROR, TAG, "clone failed");
265         return;
266     }
267
268     CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
269
270     if (!resInfo)
271     {
272         OIC_LOG(ERROR, TAG, "calloc failed");
273         CAFreeEndpoint(ep);
274         return;
275     }
276
277     resInfo->result = CA_RETRANSMIT_TIMEOUT;
278     resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size);
279     resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
280
281     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *) pdu, &(resInfo->info));
282     if (CA_STATUS_OK != res)
283     {
284         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
285         CADestroyResponseInfoInternal(resInfo);
286         CAFreeEndpoint(ep);
287         return;
288     }
289
290     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
291     if (NULL == cadata)
292     {
293         OIC_LOG(ERROR, TAG, "memory allocation failed !");
294         CAFreeEndpoint(ep);
295         CADestroyResponseInfoInternal(resInfo);
296         return;
297     }
298
299     cadata->type = SEND_TYPE_UNICAST;
300     cadata->remoteEndpoint = ep;
301     cadata->requestInfo = NULL;
302     cadata->responseInfo = resInfo;
303
304 #ifdef SINGLE_THREAD
305     CAProcessReceivedData(cadata);
306 #else
307     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
308 #endif
309
310     OIC_LOG(DEBUG, TAG, "OUT");
311 }
312
313 static void CADestroyData(void *data, uint32_t size)
314 {
315     OIC_LOG(DEBUG, TAG, "CADestroyData IN");
316     if ((size_t)size < sizeof(CAData_t))
317     {
318         OIC_LOG_V(ERROR, TAG, "Destroy data too small %p %d", data, size);
319     }
320     CAData_t *cadata = (CAData_t *) data;
321
322     if (NULL == cadata)
323     {
324         OIC_LOG(ERROR, TAG, "cadata is NULL");
325         return;
326     }
327
328     if (NULL != cadata->remoteEndpoint)
329     {
330         CAFreeEndpoint(cadata->remoteEndpoint);
331     }
332
333     if (NULL != cadata->requestInfo)
334     {
335         CADestroyRequestInfoInternal((CARequestInfo_t *) cadata->requestInfo);
336     }
337
338     if (NULL != cadata->responseInfo)
339     {
340         CADestroyResponseInfoInternal((CAResponseInfo_t *) cadata->responseInfo);
341     }
342
343     if (NULL != cadata->errorInfo)
344     {
345         CADestroyErrorInfoInternal(cadata->errorInfo);
346     }
347
348     OICFree(cadata);
349     OIC_LOG(DEBUG, TAG, "CADestroyData OUT");
350 }
351
352 #ifdef SINGLE_THREAD
353 static void CAProcessReceivedData(CAData_t *data)
354 {
355     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData IN");
356     if (!data)
357     {
358         OIC_LOG(ERROR, TAG, "thread data error!!");
359         return;
360     }
361
362     // parse the data and call the callbacks.
363     // #1 parse the data
364     // #2 get endpoint
365     CAEndpoint_t *rep = (CAEndpoint_t *)(data->remoteEndpoint);
366     if (!rep)
367     {
368         OIC_LOG(ERROR, TAG, "remoteEndpoint error!!");
369         return;
370     }
371
372     if (data->requestInfo && g_requestHandler)
373     {
374         g_requestHandler(rep, data->requestInfo);
375     }
376     else if (data->responseInfo && g_responseHandler)
377     {
378         g_responseHandler(rep, data->responseInfo);
379     }
380     else if (data->errorInfo && g_errorHandler)
381     {
382         g_errorHandler(rep, data->errorInfo);
383     }
384
385 #ifdef SINGLE_THREAD
386     CADestroyData(data, sizeof(CAData_t));
387 #endif
388
389     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData OUT");
390 }
391 #endif
392
393 #ifndef SINGLE_THREAD
394
395 static void CAReceiveThreadProcess(void *threadData)
396 {
397     OIC_LOG(DEBUG, TAG, "IN");
398 #ifndef SINGLE_HANDLE
399     CAData_t *data = (CAData_t *) threadData;
400     CAProcessReceivedData(data);
401 #else
402     (void)threadData;
403 #endif
404     OIC_LOG(DEBUG, TAG, "OUT");
405 }
406 #endif
407
408 static CAResult_t CAProcessSendData(const CAData_t *data)
409 {
410     OIC_LOG(DEBUG, TAG, "IN");
411     VERIFY_NON_NULL(data, TAG, "data");
412     VERIFY_NON_NULL(data->remoteEndpoint, TAG, "remoteEndpoint");
413
414     CAResult_t res = CA_STATUS_FAILED;
415
416     CASendDataType_t type = data->type;
417
418     coap_pdu_t *pdu = NULL;
419     CAInfo_t *info = NULL;
420
421     if (SEND_TYPE_UNICAST == type)
422     {
423
424         OIC_LOG(DEBUG,TAG,"Unicast message");
425         if (NULL != data->requestInfo)
426         {
427             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
428
429             info = &data->requestInfo->info;
430             pdu = CAGeneratePDU(data->requestInfo->method, info, data->remoteEndpoint);
431         }
432         else if (NULL != data->responseInfo)
433         {
434             OIC_LOG(DEBUG, TAG, "responseInfo is available..");
435
436             info = &data->responseInfo->info;
437             pdu = CAGeneratePDU(data->responseInfo->result, info, data->remoteEndpoint);
438         }
439         else
440         {
441             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
442             return CA_STATUS_INVALID_PARAM;
443         }
444
445         // interface controller function call.
446         if (NULL != pdu)
447         {
448 #ifdef WITH_BWT
449             if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter)
450             {
451                 // Blockwise transfer
452                 if (NULL != info)
453                 {
454                     CAResult_t res = CAAddBlockOption(&pdu, *info,
455                                                       data->remoteEndpoint);
456                     if (CA_STATUS_OK != res)
457                     {
458                         OIC_LOG(INFO, TAG, "to write block option has failed");
459                         CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
460                         coap_delete_pdu(pdu);
461                         return res;
462                     }
463                 }
464             }
465 #endif
466             CALogPDUInfo(pdu);
467
468             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
469             if (CA_STATUS_OK != res)
470             {
471                 OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
472                 CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
473                 coap_delete_pdu(pdu);
474                 return res;
475             }
476             // for retransmission
477             res = CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint, pdu->hdr,
478                                            pdu->length);
479             if ((CA_STATUS_OK != res) && (CA_NOT_SUPPORTED != res))
480             {
481                 //when retransmission not supported this will return CA_NOT_SUPPORTED, ignore
482                 OIC_LOG_V(INFO, TAG, "retransmission is not enabled due to error, res : %d", res);
483                 coap_delete_pdu(pdu);
484                 return res;
485             }
486
487             coap_delete_pdu(pdu);
488         }
489         else
490         {
491             OIC_LOG(ERROR,TAG,"Failed to generate unicast PDU");
492             CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
493             return CA_SEND_FAILED;
494         }
495     }
496     else if (SEND_TYPE_MULTICAST == type)
497     {
498         OIC_LOG(DEBUG,TAG,"Multicast message");
499         if (NULL != data->requestInfo)
500         {
501             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
502
503             info = &data->requestInfo->info;
504             pdu = CAGeneratePDU(CA_GET, info, data->remoteEndpoint);
505             if (NULL != pdu)
506             {
507 #ifdef WITH_BWT
508                 if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter)
509                 {
510                     // Blockwise transfer
511                     CAResult_t res = CAAddBlockOption(&pdu, data->requestInfo->info,
512                                                       data->remoteEndpoint);
513                     if (CA_STATUS_OK != res)
514                     {
515                         OIC_LOG(DEBUG, TAG, "CAAddBlockOption has failed");
516                         CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
517                         coap_delete_pdu(pdu);
518                         return res;
519                     }
520                 }
521 #endif
522                 CALogPDUInfo(pdu);
523
524                 res = CASendMulticastData(data->remoteEndpoint, pdu->hdr, pdu->length);
525                 if (CA_STATUS_OK != res)
526                 {
527                     OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
528                     CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res);
529                     coap_delete_pdu(pdu);
530                     return res;
531                 }
532
533                 coap_delete_pdu(pdu);
534             }
535             else
536             {
537                 OIC_LOG(ERROR,TAG,"Failed to generate multicast PDU");
538                 CASendErrorInfo(data->remoteEndpoint, info, CA_SEND_FAILED);
539                 return CA_SEND_FAILED;
540             }
541         }
542         else
543         {
544             OIC_LOG(ERROR, TAG, "request info is empty");
545             return CA_SEND_FAILED;
546         }
547     }
548
549     OIC_LOG(DEBUG, TAG, "OUT");
550     return CA_STATUS_OK;
551 }
552
553 #ifndef SINGLE_THREAD
554 static void CASendThreadProcess(void *threadData)
555 {
556     CAData_t *data = (CAData_t *) threadData;
557     CAProcessSendData(data);
558 }
559
560 #endif
561
562 /*
563  * If a second message arrives with the same token and the other address
564  * family, drop it.  Typically, IPv6 beats IPv4, so the IPv4 message is dropped.
565  */
566 static bool CADropSecondMessage(CAHistory_t *history, const CAEndpoint_t *ep, uint16_t id)
567 {
568     if (!ep)
569     {
570         return true;
571     }
572     if (ep->adapter != CA_ADAPTER_IP)
573     {
574         return false;
575     }
576     if (!caglobals.ip.dualstack)
577     {
578         return false;
579     }
580
581     bool ret = false;
582     CATransportFlags_t familyFlags = ep->flags & CA_IPFAMILY_MASK;
583
584     for (size_t i = 0; i < sizeof(history->items) / sizeof(history->items[0]); i++)
585     {
586         CAHistoryItem_t *item = &(history->items[i]);
587         if (id == item->messageId)
588         {
589             if ((familyFlags ^ item->flags) == CA_IPFAMILY_MASK)
590             {
591                 OIC_LOG_V(INFO, TAG, "IPv%c duplicate message ignored",
592                                             familyFlags & CA_IPV6 ? '6' : '4');
593                 ret = true;
594                 break;
595             }
596         }
597     }
598
599     history->items[history->nextIndex].flags = familyFlags;
600     history->items[history->nextIndex].messageId = id;
601     if (++history->nextIndex >= HISTORYSIZE)
602     {
603         history->nextIndex = 0;
604     }
605
606     return ret;
607 }
608
609 static void CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
610                                      const void *data, uint32_t dataLen)
611 {
612     OIC_LOG(DEBUG, TAG, "IN");
613     VERIFY_NON_NULL_VOID(sep, TAG, "remoteEndpoint");
614     VERIFY_NON_NULL_VOID(data, TAG, "data");
615
616     uint32_t code = CA_NOT_FOUND;
617     CAData_t *cadata = NULL;
618
619     coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code);
620     if (NULL == pdu)
621     {
622         OIC_LOG(ERROR, TAG, "Parse PDU failed");
623         return;
624     }
625
626     OIC_LOG_V(DEBUG, TAG, "code = %d", code);
627     if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code)
628     {
629         cadata = CAGenerateHandlerData(&(sep->endpoint), &(sep->identity), pdu, CA_REQUEST_DATA);
630         if (!cadata)
631         {
632             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
633             coap_delete_pdu(pdu);
634             return;
635         }
636     }
637     else
638     {
639         cadata = CAGenerateHandlerData(&(sep->endpoint), &(sep->identity), pdu, CA_RESPONSE_DATA);
640         if (!cadata)
641         {
642             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
643             coap_delete_pdu(pdu);
644             return;
645         }
646
647         // for retransmission
648         void *retransmissionPdu = NULL;
649         CARetransmissionReceivedData(&g_retransmissionContext, cadata->remoteEndpoint, pdu->hdr,
650                                      pdu->length, &retransmissionPdu);
651
652         // get token from saved data in retransmission list
653         if (retransmissionPdu && CA_EMPTY == code)
654         {
655             if (cadata->responseInfo)
656             {
657                 CAInfo_t *info = &cadata->responseInfo->info;
658                 CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu,
659                                                    info);
660                 if (CA_STATUS_OK != res)
661                 {
662                     OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
663                     OICFree(info->token);
664                     info->tokenLength = 0;
665                 }
666             }
667         }
668         OICFree(retransmissionPdu);
669     }
670
671     cadata->type = SEND_TYPE_UNICAST;
672
673 #ifdef SINGLE_THREAD
674     CAProcessReceivedData(cadata);
675 #else
676 #ifdef WITH_BWT
677     if (CA_ADAPTER_GATT_BTLE != sep->endpoint.adapter)
678     {
679         CAResult_t res = CAReceiveBlockWiseData(pdu, &(sep->endpoint), cadata, dataLen);
680         if (CA_NOT_SUPPORTED == res)
681         {
682             OIC_LOG(ERROR, TAG, "this message does not have block option");
683             CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
684         }
685         else
686         {
687             CADestroyData(cadata, sizeof(CAData_t));
688         }
689     }
690     else
691 #endif
692     {
693         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
694     }
695 #endif
696
697     coap_delete_pdu(pdu);
698
699     OIC_LOG(DEBUG, TAG, "OUT");
700 }
701
702 static void CANetworkChangedCallback(const CAEndpoint_t *info, CANetworkStatus_t status)
703 {
704     (void)info;
705     (void)status;
706     OIC_LOG(DEBUG, TAG, "IN");
707
708     OIC_LOG(DEBUG, TAG, "OUT");
709 }
710
711 void CAHandleRequestResponseCallbacks()
712 {
713 #ifdef SINGLE_THREAD
714     CAReadData();
715     CARetransmissionBaseRoutine((void *)&g_retransmissionContext);
716 #else
717 #ifdef SINGLE_HANDLE
718     // parse the data and call the callbacks.
719     // #1 parse the data
720     // #2 get endpoint
721
722     ca_mutex_lock(g_receiveThread.threadMutex);
723
724     u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
725
726     ca_mutex_unlock(g_receiveThread.threadMutex);
727
728     if (NULL == item)
729     {
730         return;
731     }
732
733     // get values
734     void *msg = item->msg;
735
736     if (NULL == msg)
737     {
738         return;
739     }
740
741     // get endpoint
742     CAData_t *td = (CAData_t *) msg;
743
744     if (td->requestInfo && g_requestHandler)
745     {
746         OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
747         g_requestHandler(td->remoteEndpoint, td->requestInfo);
748     }
749     else if (td->responseInfo && g_responseHandler)
750     {
751         OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
752         g_responseHandler(td->remoteEndpoint, td->responseInfo);
753     }
754     else if (td->errorInfo && g_errorHandler)
755     {
756         OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
757         g_errorHandler(td->remoteEndpoint, td->errorInfo);
758     }
759
760     CADestroyData(msg, sizeof(CAData_t));
761     OICFree(item);
762
763 #endif /* SINGLE_HANDLE */
764 #endif
765 }
766
767 static CAData_t* CAPrepareSendData(const CAEndpoint_t *endpoint, const void *sendData,
768                                    CADataType_t dataType)
769 {
770     OIC_LOG(DEBUG, TAG, "CAPrepareSendData IN");
771
772     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
773     if (!cadata)
774     {
775         OIC_LOG(ERROR, TAG, "memory allocation failed");
776         return NULL;
777     }
778
779     if(CA_REQUEST_DATA == dataType)
780     {
781         // clone request info
782         CARequestInfo_t *request = CACloneRequestInfo((CARequestInfo_t *)sendData);
783
784         if(!request)
785         {
786             OIC_LOG(ERROR, TAG, "CACloneRequestInfo failed");
787             OICFree(cadata);
788             return NULL;
789         }
790
791         cadata->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST;
792         cadata->requestInfo =  request;
793     }
794     else if(CA_RESPONSE_DATA == dataType)
795     {
796         // clone response info
797         CAResponseInfo_t *response = CACloneResponseInfo((CAResponseInfo_t *)sendData);
798
799         if(!response)
800         {
801             OIC_LOG(ERROR, TAG, "CACloneResponseInfo failed");
802             OICFree(cadata);
803             return NULL;
804         }
805
806         cadata->type = SEND_TYPE_UNICAST;
807         cadata->responseInfo = response;
808     }
809     else
810     {
811         OIC_LOG(ERROR, TAG, "CAPrepareSendData unknown data type");
812         OICFree(cadata);
813         return NULL;
814     }
815
816     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
817     if (!ep)
818     {
819         OIC_LOG(ERROR, TAG, "endpoint clone failed");
820         CADestroyData(cadata, sizeof(CAData_t));
821         return NULL;
822     }
823
824     cadata->remoteEndpoint = ep;
825     cadata->dataType = dataType;
826     return cadata;
827 }
828
829 CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request)
830 {
831     OIC_LOG(DEBUG, TAG, "IN");
832
833     VERIFY_NON_NULL(object, TAG, "object");
834     VERIFY_NON_NULL(request, TAG, "request");
835
836     if (false == CAIsSelectedNetworkAvailable())
837     {
838         return CA_STATUS_FAILED;
839     }
840
841 #ifdef ARDUINO
842     // If max retransmission queue is reached, then don't handle new request
843     if (CA_MAX_RT_ARRAY_SIZE == u_arraylist_length(g_retransmissionContext.dataList))
844     {
845         OIC_LOG(ERROR, TAG, "max RT queue size reached!");
846         return CA_SEND_FAILED;
847     }
848 #endif /* ARDUINO */
849
850     CAData_t *data = CAPrepareSendData(object, request, CA_REQUEST_DATA);
851     if(!data)
852     {
853         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
854         return CA_MEMORY_ALLOC_FAILED;
855     }
856
857 #ifdef SINGLE_THREAD
858     CAResult_t result = CAProcessSendData(data);
859     if(CA_STATUS_OK != result)
860     {
861         OIC_LOG(ERROR, TAG, "CAProcessSendData failed");
862         return result;
863     }
864
865     CADestroyData(data, sizeof(CAData_t));
866 #else
867 #ifdef WITH_BWT
868     if (CA_ADAPTER_GATT_BTLE != object->adapter)
869     {
870         // send block data
871         CAResult_t res = CASendBlockWiseData(data);
872         if(CA_NOT_SUPPORTED == res)
873         {
874             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
875             CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
876             return CA_STATUS_OK;
877         }
878         else
879         {
880             CADestroyData(data, sizeof(CAData_t));
881         }
882         return res;
883     }
884     else
885 #endif
886     {
887         CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
888     }
889 #endif
890
891     OIC_LOG(DEBUG, TAG, "OUT");
892     return CA_STATUS_OK;
893 }
894
895 CAResult_t CADetachResponseMessage(const CAEndpoint_t *object,
896                                    const CAResponseInfo_t *response)
897 {
898     OIC_LOG(DEBUG, TAG, "IN");
899     VERIFY_NON_NULL(object, TAG, "object");
900     VERIFY_NON_NULL(response, TAG, "response");
901
902     if (false == CAIsSelectedNetworkAvailable())
903     {
904         return CA_STATUS_FAILED;
905     }
906
907     CAData_t *data = CAPrepareSendData(object, response, CA_RESPONSE_DATA);
908     if(!data)
909     {
910         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
911         return CA_MEMORY_ALLOC_FAILED;
912     }
913
914 #ifdef SINGLE_THREAD
915     CAResult_t result = CAProcessSendData(data);
916     if(result != CA_STATUS_OK)
917     {
918         OIC_LOG(ERROR, TAG, "CAProcessSendData failed");
919         return result;
920     }
921
922     CADestroyData(data, sizeof(CAData_t));
923 #else
924 #ifdef WITH_BWT
925     if (CA_ADAPTER_GATT_BTLE != object->adapter)
926     {
927         // send block data
928         CAResult_t res = CASendBlockWiseData(data);
929         if(CA_NOT_SUPPORTED == res)
930         {
931             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
932             CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
933             return CA_STATUS_OK;
934         }
935         else
936         {
937             CADestroyData(data, sizeof(CAData_t));
938         }
939         return res;
940     }
941     else
942 #endif
943     {
944         CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
945     }
946 #endif
947
948     OIC_LOG(DEBUG, TAG, "OUT");
949     return CA_STATUS_OK;
950 }
951
952 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
953                                       uint8_t tokenLength, const CAHeaderOption_t *options,
954                                       uint8_t numOptions)
955 {
956     (void)resourceUri;
957     (void)token;
958     (void)tokenLength;
959     (void)options;
960     (void)numOptions;
961     return CA_NOT_SUPPORTED;
962 }
963
964 void CASetInterfaceCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler,
965                              CAErrorCallback errorHandler)
966 {
967     OIC_LOG(DEBUG, TAG, "IN");
968     g_requestHandler = ReqHandler;
969     g_responseHandler = RespHandler;
970     g_errorHandler = errorHandler;
971     OIC_LOG(DEBUG, TAG, "OUT");
972 }
973
974 CAResult_t CAInitializeMessageHandler()
975 {
976     OIC_LOG(DEBUG, TAG, "IN");
977     CASetPacketReceivedCallback(CAReceivedPacketCallback);
978
979     CASetNetworkChangeCallback(CANetworkChangedCallback);
980     CASetErrorHandleCallback(CAErrorHandler);
981
982 #ifndef SINGLE_THREAD
983     // create thread pool
984     CAResult_t res = ca_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle);
985
986     if (CA_STATUS_OK != res)
987     {
988         OIC_LOG(ERROR, TAG, "thread pool initialize error.");
989         return res;
990     }
991
992     // send thread initialize
993     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle,
994                                                    CASendThreadProcess, CADestroyData))
995     {
996         OIC_LOG(ERROR, TAG, "Failed to Initialize send queue thread");
997         return CA_STATUS_FAILED;
998     }
999
1000     // start send thread
1001     res = CAQueueingThreadStart(&g_sendThread);
1002
1003     if (CA_STATUS_OK != res)
1004     {
1005         OIC_LOG(ERROR, TAG, "thread start error(send thread).");
1006         ca_thread_pool_free(g_threadPoolHandle);
1007         g_threadPoolHandle = NULL;
1008         return res;
1009     }
1010
1011     // receive thread initialize
1012     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle,
1013                                                    CAReceiveThreadProcess, CADestroyData))
1014     {
1015         OIC_LOG(ERROR, TAG, "Failed to Initialize receive queue thread");
1016         return CA_STATUS_FAILED;
1017     }
1018
1019 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
1020     // start receive thread
1021     res = CAQueueingThreadStart(&gReceiveThread);
1022
1023     if (res != CA_STATUS_OK)
1024     {
1025         OIC_LOG(ERROR, TAG, "thread start error(receive thread).");
1026         return res;
1027     }
1028 #endif /* SINGLE_HANDLE */
1029
1030     // retransmission initialize
1031     CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData,
1032                                CATimeoutCallback, NULL);
1033
1034 #ifdef WITH_BWT
1035     // block-wise transfer initialize
1036     CAInitializeBlockWiseTransfer(CAAddDataToSendThread, CAAddDataToReceiveThread);
1037 #endif
1038
1039     // start retransmission
1040     res = CARetransmissionStart(&g_retransmissionContext);
1041
1042     if (CA_STATUS_OK != res)
1043     {
1044         OIC_LOG(ERROR, TAG, "thread start error(retransmission thread).");
1045         return res;
1046     }
1047
1048     // initialize interface adapters by controller
1049     CAInitializeAdapters(g_threadPoolHandle);
1050 #else
1051     // retransmission initialize
1052     CARetransmissionInitialize(&g_retransmissionContext, NULL, CASendUnicastData,
1053                                CATimeoutCallback, NULL);
1054     CAInitializeAdapters();
1055 #endif
1056
1057     OIC_LOG(DEBUG, TAG, "OUT");
1058     return CA_STATUS_OK;
1059 }
1060
1061 void CATerminateMessageHandler()
1062 {
1063     OIC_LOG(DEBUG, TAG, "IN");
1064 #ifndef SINGLE_THREAD
1065     CATransportAdapter_t connType;
1066     u_arraylist_t *list = CAGetSelectedNetworkList();
1067     uint32_t length = u_arraylist_length(list);
1068
1069     uint32_t i = 0;
1070     for (i = 0; i < length; i++)
1071     {
1072         void* ptrType = u_arraylist_get(list, i);
1073
1074         if (NULL == ptrType)
1075         {
1076             continue;
1077         }
1078
1079         connType = *(CATransportAdapter_t *)ptrType;
1080         CAStopAdapter(connType);
1081     }
1082
1083     // stop retransmission
1084     if (NULL != g_retransmissionContext.threadMutex)
1085     {
1086         CARetransmissionStop(&g_retransmissionContext);
1087     }
1088
1089     // stop thread
1090     // delete thread data
1091     if (NULL != g_sendThread.threadMutex)
1092     {
1093         CAQueueingThreadStop(&g_sendThread);
1094     }
1095
1096     // stop thread
1097     // delete thread data
1098     if (NULL != g_receiveThread.threadMutex)
1099     {
1100 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
1101         CAQueueingThreadStop(&gReceiveThread);
1102 #endif /* SINGLE_HANDLE */
1103     }
1104
1105     // destroy thread pool
1106     if (NULL != g_threadPoolHandle)
1107     {
1108         ca_thread_pool_free(g_threadPoolHandle);
1109         g_threadPoolHandle = NULL;
1110     }
1111
1112 #ifdef WITH_BWT
1113     CATerminateBlockWiseTransfer();
1114 #endif
1115     CARetransmissionDestroy(&g_retransmissionContext);
1116     CAQueueingThreadDestroy(&g_sendThread);
1117     CAQueueingThreadDestroy(&g_receiveThread);
1118
1119     // terminate interface adapters by controller
1120     CATerminateAdapters();
1121 #else
1122     // terminate interface adapters by controller
1123     CATerminateAdapters();
1124
1125     // stop retransmission
1126     CARetransmissionStop(&g_retransmissionContext);
1127     CARetransmissionDestroy(&g_retransmissionContext);
1128 #endif
1129
1130     OIC_LOG(DEBUG, TAG, "OUT");
1131 }
1132
1133 void CALogPDUInfo(coap_pdu_t *pdu)
1134 {
1135     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
1136
1137     OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
1138
1139     OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
1140
1141     OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
1142
1143     OIC_LOG(DEBUG, TAG, "PDU Maker - token :");
1144
1145     OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->token, pdu->hdr->token_length);
1146 }
1147
1148 static void CALogPayloadInfo(CAInfo_t *info)
1149 {
1150     if(info)
1151     {
1152         if (info->options)
1153         {
1154             for (uint32_t i = 0; i < info->numOptions; i++)
1155             {
1156                 OIC_LOG_V(DEBUG, TAG, "optionID: %d", info->options[i].optionID);
1157
1158                 OIC_LOG_V(DEBUG, TAG, "list: %s", info->options[i].optionData);
1159             }
1160         }
1161
1162         if (info->payload)
1163         {
1164             OIC_LOG_V(DEBUG, TAG, "payload: %p(%u)", info->payload,
1165                       info->payloadSize);
1166         }
1167
1168         if (info->token)
1169         {
1170             OIC_LOG(DEBUG, TAG, "token:");
1171             OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) info->token,
1172                            info->tokenLength);
1173         }
1174         OIC_LOG_V(DEBUG, TAG, "msgID: %d", info->messageId);
1175     }
1176     else
1177     {
1178         OIC_LOG(DEBUG, TAG, "info is NULL, cannot output log data");
1179     }
1180 }
1181
1182 void CAErrorHandler(const CAEndpoint_t *endpoint,
1183                     const void *data, uint32_t dataLen,
1184                     CAResult_t result)
1185 {
1186     OIC_LOG(DEBUG, TAG, "CAErrorHandler IN");
1187
1188 #ifndef SINGLE_THREAD
1189
1190     VERIFY_NON_NULL_VOID(endpoint, TAG, "remoteEndpoint");
1191     VERIFY_NON_NULL_VOID(data, TAG, "data");
1192
1193     uint32_t code = CA_NOT_FOUND;
1194     //Do not free remoteEndpoint and data. Currently they will be freed in data thread
1195     //Get PDU data
1196     coap_pdu_t *pdu = (coap_pdu_t *)CAParsePDU((const char *)data, dataLen, &code);
1197     if (NULL == pdu)
1198     {
1199         OIC_LOG(ERROR, TAG, "Parse PDU failed");
1200         return;
1201     }
1202
1203     CAData_t *cadata = CAGenerateHandlerData(endpoint, NULL, pdu, CA_ERROR_DATA);
1204     if(!cadata)
1205     {
1206         OIC_LOG(ERROR, TAG, "CAErrorHandler, CAGenerateHandlerData failed!");
1207         coap_delete_pdu(pdu);
1208         return;
1209     }
1210
1211     cadata->errorInfo->result = result;
1212
1213     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1214     coap_delete_pdu(pdu);
1215 #endif
1216
1217     OIC_LOG(DEBUG, TAG, "CAErrorHandler OUT");
1218     return;
1219 }
1220
1221 static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info, CAResult_t result)
1222 {
1223     OIC_LOG(DEBUG, TAG, "CASendErrorInfo IN");
1224 #ifndef SINGLE_THREAD
1225     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
1226     if (!cadata)
1227     {
1228         OIC_LOG(ERROR, TAG, "memory allocation failed");
1229         return;
1230     }
1231
1232     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
1233     if (!ep)
1234     {
1235         OIC_LOG(ERROR, TAG, "endpoint clone failed");
1236         OICFree(cadata);
1237         return;
1238     }
1239
1240     CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
1241     if (!errorInfo)
1242     {
1243         OICFree(cadata);
1244         CAFreeEndpoint(ep);
1245         return;
1246     }
1247
1248     CAResult_t res = CACloneInfo(info, &errorInfo->info);
1249     if (CA_STATUS_OK != res)
1250     {
1251         OICFree(cadata);
1252         CAFreeEndpoint(ep);
1253         return;
1254     }
1255
1256     errorInfo->result = result;
1257     cadata->remoteEndpoint = ep;
1258     cadata->errorInfo = errorInfo;
1259     cadata->dataType = CA_ERROR_DATA;
1260
1261     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1262 #endif
1263     OIC_LOG(DEBUG, TAG, "CASendErrorInfo OUT");
1264 }