Merge Single/Multi thread for camessagehandler
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "caprotocolmessage.h"
30 #include "logger.h"
31 #include "config.h" /* for coap protocol */
32 #include "oic_malloc.h"
33 #include "canetworkconfigurator.h"
34 #include "caadapterutils.h"
35 #include "cainterfacecontroller.h"
36 #include "caretransmission.h"
37
38 #ifndef  SINGLE_THREAD
39 #include "uqueue.h"
40 #include "cathreadpool.h" /* for thread pool */
41 #include "caqueueingthread.h"
42
43 #define SINGLE_HANDLE
44 #define MAX_THREAD_POOL_SIZE    20
45
46 // thread pool handle
47 static ca_thread_pool_t g_threadPoolHandle = NULL;
48
49 // message handler main thread
50 static CAQueueingThread_t g_sendThread;
51 static CAQueueingThread_t g_receiveThread;
52
53 #else
54 #define CA_MAX_RT_ARRAY_SIZE    3
55 #endif  /* SINGLE_THREAD */
56
57 #define TAG "CA_MSG_HNDLR"
58
59 static CARetransmission_t g_retransmissionContext;
60
61 // handler field
62 static CARequestCallback g_requestHandler = NULL;
63 static CAResponseCallback g_responseHandler = NULL;
64 static CAErrorCallback g_errorHandler = NULL;
65
66 static void CAErrorHandler(const CAEndpoint_t *endpoint,
67                            const void *data, uint32_t dataLen,
68                            CAResult_t result);
69
70 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint, const void *data,
71                                        CADataType_t dataType);
72
73 static void CAProcessReceivedData(CAData_t *data);
74 static void CADataDestroyer(void *data, uint32_t size);
75 static void CALogPayloadInfo(CAInfo_t *info);
76 static bool CADropSecondRequest(const CAEndpoint_t *endpoint, uint16_t messageId);
77
78 static bool CAIsSelectedNetworkAvailable()
79 {
80     u_arraylist_t *list = CAGetSelectedNetworkList();
81     if (!list || list->length == 0)
82     {
83         OIC_LOG(ERROR, TAG, "No selected network");
84         return false;
85     }
86
87     return true;
88 }
89
90 static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint, const void *data, CADataType_t dataType)
91 {
92     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData IN");
93     CAInfo_t *info = NULL;
94     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
95     if (!cadata)
96     {
97         OIC_LOG(ERROR, TAG, "memory allocation failed");
98         return NULL;
99     }
100
101     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
102     if (!ep)
103     {
104         OIC_LOG(ERROR, TAG, "endpoint clone failed");
105         OICFree(cadata);
106         return NULL;
107     }
108
109     OIC_LOG_V(DEBUG, TAG, "address : %s", ep->addr);
110     CAResult_t result;
111
112     if(CA_RESPONSE_DATA == dataType)
113     {
114         CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
115         if (!resInfo)
116         {
117             OIC_LOG(ERROR, TAG, "memory allocation failed");
118             OICFree(cadata);
119             CAFreeEndpoint(ep);
120             return NULL;
121         }
122
123         result = CAGetResponseInfoFromPDU(data, resInfo);
124         if (CA_STATUS_OK != result)
125         {
126             OIC_LOG(ERROR, TAG, "CAGetResponseInfoFromPDU Failed");
127             CAFreeEndpoint(ep);
128             CADestroyResponseInfoInternal(resInfo);
129             OICFree(cadata);
130             return NULL;
131         }
132         cadata->responseInfo = resInfo;
133         info = &resInfo->info;
134         OIC_LOG(DEBUG, TAG, "Response Info :");
135         CALogPayloadInfo(info);
136     }
137     else if (CA_REQUEST_DATA == dataType)
138     {
139         CARequestInfo_t* reqInfo = (CARequestInfo_t*)OICCalloc(1, sizeof(CARequestInfo_t));
140         if (!reqInfo)
141         {
142             OIC_LOG(ERROR, TAG, "memory allocation failed");
143             OICFree(cadata);
144             CAFreeEndpoint(ep);
145             return NULL;
146         }
147
148         result = CAGetRequestInfoFromPDU(data, reqInfo);
149         if (CA_STATUS_OK != result)
150         {
151             OIC_LOG(ERROR, TAG, "CAGetRequestInfoFromPDU failed");
152             CAFreeEndpoint(ep);
153             CADestroyRequestInfoInternal(reqInfo);
154             OICFree(cadata);
155             return NULL;
156         }
157
158         if (CADropSecondRequest(endpoint, reqInfo->info.messageId))
159         {
160             OIC_LOG(ERROR, TAG, "Second Request with same Token, Drop it");
161             CAFreeEndpoint(ep);
162             CADestroyRequestInfoInternal(reqInfo);
163             OICFree(cadata);
164             return NULL;
165         }
166
167         cadata->requestInfo = reqInfo;
168         info = &reqInfo->info;
169         OIC_LOG(DEBUG, TAG, "Request Info :");
170         CALogPayloadInfo(info);
171    }
172     else if (CA_ERROR_DATA == dataType)
173     {
174         CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t));
175         if (!errorInfo)
176         {
177             OIC_LOG(ERROR, TAG, "Memory allocation failed!");
178             OICFree(cadata);
179             CAFreeEndpoint(ep);
180             return NULL;
181         }
182
183         CAResult_t result = CAGetErrorInfoFromPDU(data, errorInfo);
184         if (CA_STATUS_OK != result)
185         {
186             OIC_LOG(ERROR, TAG, "CAGetErrorInfoFromPDU failed");
187             CAFreeEndpoint(ep);
188             OICFree(errorInfo);
189             OICFree(cadata);
190             return NULL;
191         }
192
193         cadata->errorInfo = errorInfo;
194         info = &errorInfo->info;
195         OIC_LOG(DEBUG, TAG, "error Info :");
196         CALogPayloadInfo(info);
197     }
198
199     cadata->remoteEndpoint = ep;
200     cadata->dataType = dataType;
201
202     return cadata;
203
204     OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData OUT");
205 }
206
207 static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uint32_t size)
208 {
209     OIC_LOG(DEBUG, TAG, "IN");
210     VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint");
211     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
212
213     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
214     if (!ep)
215     {
216         OIC_LOG(ERROR, TAG, "clone failed");
217         return;
218     }
219
220     CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t));
221
222     if (!resInfo)
223     {
224         OIC_LOG(ERROR, TAG, "calloc failed");
225         CAFreeEndpoint(ep);
226         return;
227     }
228
229     resInfo->result = CA_RETRANSMIT_TIMEOUT;
230     resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size);
231     resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
232
233     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *) pdu, &(resInfo->info));
234     if (CA_STATUS_OK != res)
235     {
236         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
237         CADestroyResponseInfoInternal(resInfo);
238         CAFreeEndpoint(ep);
239         return;
240     }
241
242     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
243     if (NULL == cadata)
244     {
245         OIC_LOG(ERROR, TAG, "memory allocation failed !");
246         CAFreeEndpoint(ep);
247         CADestroyResponseInfoInternal(resInfo);
248         return;
249     }
250
251     cadata->type = SEND_TYPE_UNICAST;
252     cadata->remoteEndpoint = ep;
253     cadata->requestInfo = NULL;
254     cadata->responseInfo = resInfo;
255
256 #ifdef SINGLE_THREAD
257     CAProcessReceivedData(cadata);
258 #else
259     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
260 #endif
261
262     OIC_LOG(DEBUG, TAG, "OUT");
263 }
264
265 static void CADataDestroyer(void *data, uint32_t size)
266 {
267     OIC_LOG(DEBUG, TAG, "CADataDestroyer IN");
268     CAData_t *cadata = (CAData_t *) data;
269
270     if (NULL == cadata)
271     {
272         OIC_LOG(ERROR, TAG, "cadata is NULL");
273         return;
274     }
275
276     if (NULL != cadata->remoteEndpoint)
277     {
278         CAFreeEndpoint(cadata->remoteEndpoint);
279     }
280
281     if (NULL != cadata->requestInfo)
282     {
283         CADestroyRequestInfoInternal((CARequestInfo_t *) cadata->requestInfo);
284     }
285
286     if (NULL != cadata->responseInfo)
287     {
288         CADestroyResponseInfoInternal((CAResponseInfo_t *) cadata->responseInfo);
289     }
290
291     if (NULL != cadata->errorInfo)
292     {
293         CADestroyErrorInfoInternal(cadata->errorInfo);
294     }
295
296     OICFree(cadata->options);
297     OICFree(cadata);
298     OIC_LOG(DEBUG, TAG, "CADataDestroyer OUT");
299 }
300
301 static void CAProcessReceivedData(CAData_t *data)
302 {
303     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData IN");
304     if (!data)
305     {
306         OIC_LOG(ERROR, TAG, "thread data error!!");
307         return;
308     }
309
310     // parse the data and call the callbacks.
311     // #1 parse the data
312     // #2 get endpoint
313     CAEndpoint_t *rep = (CAEndpoint_t *)(data->remoteEndpoint);
314     if (!rep)
315     {
316         OIC_LOG(ERROR, TAG, "remoteEndpoint error!!");
317         return;
318     }
319
320     if (data->requestInfo && g_requestHandler)
321     {
322         g_requestHandler(rep, data->requestInfo);
323     }
324     else if (data->responseInfo && g_responseHandler)
325     {
326         g_responseHandler(rep, data->responseInfo);
327     }
328     else if (data->errorInfo && g_errorHandler)
329     {
330         g_errorHandler(rep, data->errorInfo);
331     }
332
333
334 #ifdef SINGLE_THREAD
335     CADataDestroyer(data, sizeof(CAData_t));
336 #endif
337
338     OIC_LOG(DEBUG, TAG, "CAProcessReceivedData OUT");
339 }
340
341 #ifndef SINGLE_THREAD
342
343 static void CAReceiveThreadProcess(void *threadData)
344 {
345     OIC_LOG(DEBUG, TAG, "IN");
346 #ifndef SINGLE_HANDLE
347     CAData_t *data = (CAData_t *) threadData;
348     CAProcessReceivedData(data);
349 #endif
350     OIC_LOG(DEBUG, TAG, "OUT");
351 }
352 #endif
353
354 static void CAProcessSendData(const CAData_t *data)
355 {
356     OIC_LOG(DEBUG, TAG, "IN");
357     VERIFY_NON_NULL_VOID(data, TAG, "data");
358     VERIFY_NON_NULL_VOID(data->remoteEndpoint, TAG, "remoteEndpoint");
359
360     CAResult_t res = CA_STATUS_FAILED;
361
362     CASendDataType_t type = data->type;
363
364     coap_pdu_t *pdu = NULL;
365
366     if (SEND_TYPE_UNICAST == type)
367     {
368
369         OIC_LOG(DEBUG,TAG,"Unicast message");
370         if (NULL != data->requestInfo)
371         {
372             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
373
374             pdu = CAGeneratePDU(data->requestInfo->method, &data->requestInfo->info);
375         }
376         else if (NULL != data->responseInfo)
377         {
378             OIC_LOG(DEBUG, TAG, "responseInfo is available..");
379
380             pdu = CAGeneratePDU(data->responseInfo->result, &data->responseInfo->info);
381         }
382         else
383         {
384             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
385             return;
386         }
387
388         // interface controller function call.
389         if (NULL != pdu)
390         {
391             CALogPDUInfo(pdu);
392
393             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
394             if (CA_STATUS_OK != res)
395             {
396                 OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
397                 coap_delete_pdu(pdu);
398                 return;
399             }
400             // for retransmission
401             res = CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint, pdu->hdr,
402                                            pdu->length);
403             if (CA_STATUS_OK != res)
404             {
405                 OIC_LOG_V(INFO, TAG, "retransmission is not enabled due to error, res : %d", res);
406                 coap_delete_pdu(pdu);
407                 return;
408             }
409
410             coap_delete_pdu(pdu);
411         }
412         else
413         {
414             OIC_LOG(ERROR,TAG,"Failed to generate unicast PDU");
415             return;
416         }
417     }
418     else if (SEND_TYPE_MULTICAST == type)
419     {
420         OIC_LOG(DEBUG,TAG,"Multicast message");
421         if (NULL != data->requestInfo)
422         {
423             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
424             CAInfo_t *info = &data->requestInfo->info;
425
426             pdu = CAGeneratePDU(CA_GET, info);
427             if (NULL != pdu)
428             {
429                 CALogPDUInfo(pdu);
430
431                 res = CASendMulticastData(data->remoteEndpoint, pdu->hdr, pdu->length);
432                 if (CA_STATUS_OK != res)
433                 {
434                     OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
435                     coap_delete_pdu(pdu);
436                     return;
437                 }
438
439                 coap_delete_pdu(pdu);
440             }
441             else
442             {
443                 OIC_LOG(ERROR,TAG,"Failed to generate multicast PDU");
444             }
445         }
446         else
447         {
448             OIC_LOG(ERROR, TAG, "request info is empty");
449         }
450     }
451
452     OIC_LOG(DEBUG, TAG, "OUT");
453 }
454
455 #ifndef SINGLE_THREAD
456 static void CASendThreadProcess(void *threadData)
457 {
458     CAData_t *data = (CAData_t *) threadData;
459     CAProcessSendData(data);
460 }
461
462 #endif
463
464 /*
465  * If a second message arrives with the same token and the other address
466  * family, drop it.  Typically, IPv6 beats IPv4, so the IPv4 message is dropped.
467  * This can be made more robust (for instance, another message could arrive
468  * in between), but it is good enough for now.
469  */
470 static bool CADropSecondRequest(const CAEndpoint_t *endpoint, uint16_t messageId)
471 {
472     if (!endpoint)
473     {
474         return true;
475     }
476     if (endpoint->adapter != CA_ADAPTER_IP)
477     {
478         return false;
479     }
480
481     bool ret = false;
482     CATransportFlags_t familyFlags = endpoint->flags & CA_IPFAMILY_MASK;
483
484     if (messageId == caglobals.ca.previousRequestMessageId)
485     {
486         if ((familyFlags ^ caglobals.ca.previousRequestFlags) == CA_IPFAMILY_MASK)
487         {
488             if (familyFlags & CA_IPV6)
489             {
490                 OIC_LOG(INFO, TAG, "IPv6 duplicate response ignored");
491             }
492             else
493             {
494                 OIC_LOG(INFO, TAG, "IPv4 duplicate response ignored");
495             }
496             ret = true;
497         }
498     }
499     caglobals.ca.previousRequestFlags = familyFlags;
500     caglobals.ca.previousRequestMessageId = messageId;
501     return ret;
502 }
503
504 static void CAReceivedPacketCallback(const CAEndpoint_t *remoteEndpoint, void *data, uint32_t dataLen)
505 {
506     OIC_LOG(DEBUG, TAG, "IN");
507     VERIFY_NON_NULL_VOID(remoteEndpoint, TAG, "remoteEndpoint");
508     VERIFY_NON_NULL_VOID(data, TAG, "data");
509
510     uint32_t code = CA_NOT_FOUND;
511     CAData_t *cadata = NULL;
512
513     coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code);
514     if (NULL == pdu)
515     {
516         OIC_LOG(ERROR, TAG, "Parse PDU failed");
517         return;
518     }
519
520     OIC_LOG_V(DEBUG, TAG, "code = %d", code);
521     if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code)
522     {
523         cadata = CAGenerateHandlerData(remoteEndpoint, pdu, CA_REQUEST_DATA);
524         if (!cadata)
525         {
526             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
527             coap_delete_pdu(pdu);
528             return;
529         }
530     }
531     else
532     {
533         cadata = CAGenerateHandlerData(remoteEndpoint, pdu, CA_RESPONSE_DATA);
534         if (!cadata)
535         {
536             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!");
537             coap_delete_pdu(pdu);
538             return;
539         }
540
541         // for retransmission
542         void *retransmissionPdu = NULL;
543         CARetransmissionReceivedData(&g_retransmissionContext, cadata->remoteEndpoint, pdu->hdr,
544                                      pdu->length, &retransmissionPdu);
545
546         // get token from saved data in retransmission list
547         if (retransmissionPdu && CA_EMPTY == code)
548         {
549             CAInfo_t *info = &cadata->responseInfo->info;
550             CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu,
551                                                info);
552             if (CA_STATUS_OK != res)
553             {
554                 OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
555                 OICFree(info->token);
556                 info->tokenLength = 0;
557             }
558         }
559         OICFree(retransmissionPdu);
560     }
561
562     cadata->type = SEND_TYPE_UNICAST;
563
564 #ifdef SINGLE_THREAD
565     CAProcessReceivedData(cadata);
566 #else
567     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
568 #endif
569
570     coap_delete_pdu(pdu);
571
572     OIC_LOG(DEBUG, TAG, "OUT");
573 }
574
575 static void CANetworkChangedCallback(const CAEndpoint_t *info, CANetworkStatus_t status)
576 {
577     OIC_LOG(DEBUG, TAG, "IN");
578
579     OIC_LOG(DEBUG, TAG, "OUT");
580 }
581
582 void CAHandleRequestResponseCallbacks()
583 {
584     OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks IN");
585 #ifdef SINGLE_THREAD
586     CAReadData();
587     CARetransmissionBaseRoutine((void *)&g_retransmissionContext);
588 #else
589 #ifdef SINGLE_HANDLE
590     // parse the data and call the callbacks.
591     // #1 parse the data
592     // #2 get endpoint
593
594     ca_mutex_lock(g_receiveThread.threadMutex);
595
596     u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
597
598     ca_mutex_unlock(g_receiveThread.threadMutex);
599
600     if (NULL == item)
601     {
602         return;
603     }
604
605     // get values
606     void *msg = item->msg;
607
608     if (NULL == msg)
609     {
610         return;
611     }
612
613     // get endpoint
614     CAData_t *td = (CAData_t *) msg;
615
616     if (td->requestInfo && g_requestHandler)
617     {
618         OIC_LOG_V(DEBUG, TAG, "request callback : %d", td->requestInfo->info.numOptions);
619         g_requestHandler(td->remoteEndpoint, td->requestInfo);
620     }
621     else if (td->responseInfo && g_responseHandler)
622     {
623         OIC_LOG_V(DEBUG, TAG, "response callback : %d", td->responseInfo->info.numOptions);
624         g_responseHandler(td->remoteEndpoint, td->responseInfo);
625     }
626     else if (td->errorInfo && g_errorHandler)
627     {
628         OIC_LOG_V(DEBUG, TAG, "error callback error: %d", td->errorInfo->result);
629         g_errorHandler(td->remoteEndpoint, td->errorInfo);
630     }
631
632     CADataDestroyer(msg, sizeof(CAData_t));
633     OICFree(item);
634
635 #endif /* SINGLE_HANDLE */
636 #endif
637     OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks OUT");
638 }
639
640 static CAData_t* CAPrepareSendData(const CAEndpoint_t *endpoint, const void *sendData,
641                                    CADataType_t dataType)
642 {
643     OIC_LOG(DEBUG, TAG, "CAPrepareSendData IN");
644     CAInfo_t *info = NULL;
645
646     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
647     if (!cadata)
648     {
649         OIC_LOG(ERROR, TAG, "memory allocation failed");
650         return NULL;
651     }
652
653     if(CA_REQUEST_DATA == dataType)
654     {
655         // clone request info
656         CARequestInfo_t *request = CACloneRequestInfo((CARequestInfo_t *)sendData);
657
658         if(!request)
659         {
660             OIC_LOG(ERROR, TAG, "CACloneRequestInfo failed");
661             OICFree(cadata);
662             return NULL;
663         }
664
665         cadata->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST;
666         info = &request->info;
667         cadata->requestInfo =  request;
668     }
669     else if(CA_RESPONSE_DATA == dataType)
670     {
671         // clone response info
672         CAResponseInfo_t *response = CACloneResponseInfo((CAResponseInfo_t *)sendData);
673
674         if(!response)
675         {
676             OIC_LOG(ERROR, TAG, "CACloneResponseInfo failed");
677             OICFree(cadata);
678             return NULL;
679         }
680
681         cadata->type = SEND_TYPE_UNICAST;
682         info = &response->info;
683         cadata->responseInfo = response;
684     }
685
686     if (NULL != info->options && 0 < info->numOptions)
687     {
688         uint8_t numOptions = info->numOptions;
689         // copy data
690         CAHeaderOption_t *headerOption = (CAHeaderOption_t *) OICMalloc(sizeof(CAHeaderOption_t)
691                                                                         * numOptions);
692         if(!headerOption)
693         {
694             OIC_LOG(ERROR, TAG, "memory allocation failed");
695             CADataDestroyer(cadata, sizeof(CAData_t));
696             return NULL;
697         }
698
699         memcpy(headerOption, info->options, sizeof(CAHeaderOption_t) * numOptions);
700
701         cadata->options = headerOption;
702         cadata->numOptions = numOptions;
703     }
704
705     CAEndpoint_t* ep = CACloneEndpoint(endpoint);
706     if (!ep)
707     {
708         OIC_LOG(ERROR, TAG, "endpoint clone failed");
709         CADataDestroyer(cadata, sizeof(CAData_t));
710         return NULL;
711     }
712
713     cadata->remoteEndpoint = ep;
714     return cadata;
715
716 }
717
718 CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request)
719 {
720     OIC_LOG(DEBUG, TAG, "IN");
721
722     VERIFY_NON_NULL(object, TAG, "object");
723     VERIFY_NON_NULL(request, TAG, "request");
724
725     if (false == CAIsSelectedNetworkAvailable())
726     {
727         return CA_STATUS_FAILED;
728     }
729
730 #ifdef ARDUINO
731     // If max retransmission queue is reached, then don't handle new request
732     if (CA_MAX_RT_ARRAY_SIZE == u_arraylist_length(g_retransmissionContext.dataList))
733     {
734         OIC_LOG(ERROR, TAG, "max RT queue size reached!");
735         return CA_SEND_FAILED;
736     }
737 #endif /* ARDUINO */
738
739     CAData_t *data = CAPrepareSendData(object, request, CA_REQUEST_DATA);
740     if(!data)
741     {
742         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
743         return CA_MEMORY_ALLOC_FAILED;
744     }
745
746 #ifdef SINGLE_THREAD
747     CAProcessSendData(data);
748     CADataDestroyer(data, sizeof(CAData_t));
749 #else
750     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
751 #endif
752
753     OIC_LOG(DEBUG, TAG, "OUT");
754     return CA_STATUS_OK;
755 }
756
757 CAResult_t CADetachResponseMessage(const CAEndpoint_t *object,
758                                    const CAResponseInfo_t *response)
759 {
760     OIC_LOG(DEBUG, TAG, "IN");
761     VERIFY_NON_NULL(object, TAG, "object");
762     VERIFY_NON_NULL(response, TAG, "response");
763
764     if (false == CAIsSelectedNetworkAvailable())
765     {
766         return CA_STATUS_FAILED;
767     }
768
769     CAData_t *data = CAPrepareSendData(object, response, CA_RESPONSE_DATA);
770     if(!data)
771     {
772         OIC_LOG(ERROR, TAG, "CAPrepareSendData failed");
773         return CA_MEMORY_ALLOC_FAILED;
774     }
775
776 #ifdef SINGLE_THREAD
777     CAProcessSendData(data);
778     CADataDestroyer(data, sizeof(CAData_t));
779 #else
780     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
781 #endif
782
783     OIC_LOG(DEBUG, TAG, "OUT");
784     return CA_STATUS_OK;
785 }
786
787 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
788                                       uint8_t tokenLength, const CAHeaderOption_t *options,
789                                       uint8_t numOptions)
790 {
791     return CA_NOT_SUPPORTED;
792 }
793
794 void CASetInterfaceCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler,
795                              CAErrorCallback errorHandler)
796 {
797     OIC_LOG(DEBUG, TAG, "IN");
798     g_requestHandler = ReqHandler;
799     g_responseHandler = RespHandler;
800     g_errorHandler = errorHandler;
801     OIC_LOG(DEBUG, TAG, "OUT");
802 }
803
804 CAResult_t CAInitializeMessageHandler()
805 {
806     OIC_LOG(DEBUG, TAG, "IN");
807     CASetPacketReceivedCallback(CAReceivedPacketCallback);
808
809     CASetNetworkChangeCallback(CANetworkChangedCallback);
810     CASetErrorHandleCallback(CAErrorHandler);
811
812 #ifndef SINGLE_THREAD
813     // create thread pool
814     CAResult_t res = ca_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle);
815
816     if (CA_STATUS_OK != res)
817     {
818         OIC_LOG(ERROR, TAG, "thread pool initialize error.");
819         return res;
820     }
821
822     // send thread initialize
823     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle,
824                                                    CASendThreadProcess, CADataDestroyer))
825     {
826         OIC_LOG(ERROR, TAG, "Failed to Initialize send queue thread");
827         return CA_STATUS_FAILED;
828     }
829
830     // start send thread
831     res = CAQueueingThreadStart(&g_sendThread);
832
833     if (CA_STATUS_OK != res)
834     {
835         OIC_LOG(ERROR, TAG, "thread start error(send thread).");
836         ca_thread_pool_free(g_threadPoolHandle);
837         g_threadPoolHandle = NULL;
838         return res;
839     }
840
841     // receive thread initialize
842     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle,
843                                                    CAReceiveThreadProcess, CADataDestroyer))
844     {
845         OIC_LOG(ERROR, TAG, "Failed to Initialize receive queue thread");
846         return CA_STATUS_FAILED;
847     }
848
849 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
850     // start receive thread
851     res = CAQueueingThreadStart(&gReceiveThread);
852
853     if (res != CA_STATUS_OK)
854     {
855         OIC_LOG(ERROR, TAG, "thread start error(receive thread).");
856         return res;
857     }
858 #endif /* SINGLE_HANDLE */
859
860     // retransmission initialize
861     CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData,
862                                CATimeoutCallback, NULL);
863
864     // start retransmission
865     res = CARetransmissionStart(&g_retransmissionContext);
866
867     if (CA_STATUS_OK != res)
868     {
869         OIC_LOG(ERROR, TAG, "thread start error(retransmission thread).");
870         return res;
871     }
872
873     // initialize interface adapters by controller
874     CAInitializeAdapters(g_threadPoolHandle);
875 #else
876     // retransmission initialize
877     CARetransmissionInitialize(&g_retransmissionContext, NULL, CASendUnicastData,
878                                CATimeoutCallback, NULL);
879     CAInitializeAdapters();
880 #endif
881
882     OIC_LOG(DEBUG, TAG, "OUT");
883     return CA_STATUS_OK;
884 }
885
886 void CATerminateMessageHandler()
887 {
888     OIC_LOG(DEBUG, TAG, "IN");
889 #ifndef SINGLE_THREAD
890     CATransportAdapter_t connType;
891     u_arraylist_t *list = CAGetSelectedNetworkList();
892     uint32_t length = u_arraylist_length(list);
893
894     uint32_t i = 0;
895     for (i = 0; i < length; i++)
896     {
897         void* ptrType = u_arraylist_get(list, i);
898
899         if (NULL == ptrType)
900         {
901             continue;
902         }
903
904         connType = *(CATransportAdapter_t *)ptrType;
905         CAStopAdapter(connType);
906     }
907
908     // stop retransmission
909     if (NULL != g_retransmissionContext.threadMutex)
910     {
911         CARetransmissionStop(&g_retransmissionContext);
912     }
913
914     // stop thread
915     // delete thread data
916     if (NULL != g_sendThread.threadMutex)
917     {
918         CAQueueingThreadStop(&g_sendThread);
919     }
920
921     // stop thread
922     // delete thread data
923     if (NULL != g_receiveThread.threadMutex)
924     {
925 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
926         CAQueueingThreadStop(&gReceiveThread);
927 #endif /* SINGLE_HANDLE */
928     }
929
930     // destroy thread pool
931     if (NULL != g_threadPoolHandle)
932     {
933         ca_thread_pool_free(g_threadPoolHandle);
934         g_threadPoolHandle = NULL;
935     }
936
937     CARetransmissionDestroy(&g_retransmissionContext);
938     CAQueueingThreadDestroy(&g_sendThread);
939     CAQueueingThreadDestroy(&g_receiveThread);
940
941     // terminate interface adapters by controller
942     CATerminateAdapters();
943 #else
944     // terminate interface adapters by controller
945     CATerminateAdapters();
946
947     // stop retransmission
948     CARetransmissionStop(&g_retransmissionContext);
949     CARetransmissionDestroy(&g_retransmissionContext);
950 #endif
951
952     OIC_LOG(DEBUG, TAG, "OUT");
953 }
954
955 void CALogPDUInfo(coap_pdu_t *pdu)
956 {
957     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
958
959     OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
960
961     OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
962
963     OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
964
965     OIC_LOG(DEBUG, TAG, "PDU Maker - token :");
966
967     OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->token, pdu->hdr->token_length);
968 }
969
970 static void CALogPayloadInfo(CAInfo_t *info)
971 {
972     if(info)
973     {
974         if (!info->options)
975         {
976             for (uint32_t i = 0; i < info->numOptions; i++)
977             {
978                 OIC_LOG_V(DEBUG, TAG, "optionID: %d", info->options[i].optionID);
979
980                 OIC_LOG_V(DEBUG, TAG, "list: %s", info->options[i].optionData);
981             }
982         }
983
984         if (!info->payload)
985         {
986             OIC_LOG_V(DEBUG, TAG, "payload: %p(%u)", info->payload,
987                       info->payloadSize);
988         }
989
990         if (!info->token)
991         {
992             OIC_LOG(DEBUG, TAG, "token:");
993             OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) info->token,
994                            info->tokenLength);
995         }
996         OIC_LOG_V(DEBUG, TAG, "msgID: %d", info->messageId);
997     }
998     else
999     {
1000         OIC_LOG(DEBUG, TAG, "info is NULL, cannot output log data");
1001     }
1002 }
1003
1004 void CAErrorHandler(const CAEndpoint_t *endpoint,
1005                     const void *data, uint32_t dataLen,
1006                     CAResult_t result)
1007 {
1008     OIC_LOG(DEBUG, TAG, "IN");
1009
1010 #ifndef SINGLE_THREAD
1011
1012     VERIFY_NON_NULL_VOID(endpoint, TAG, "remoteEndpoint");
1013     VERIFY_NON_NULL_VOID(data, TAG, "data");
1014
1015     uint32_t code = CA_NOT_FOUND;
1016     //Do not free remoteEndpoint and data. Currently they will be freed in data thread
1017     //Get PDU data
1018     coap_pdu_t *pdu = (coap_pdu_t *)CAParsePDU((const char *)data, dataLen, &code);
1019     if (NULL == pdu)
1020     {
1021         OIC_LOG(ERROR, TAG, "Parse PDU failed");
1022         return;
1023     }
1024
1025     CAData_t *cadata = CAGenerateHandlerData(endpoint, pdu, CA_ERROR_DATA);
1026     if(!cadata)
1027     {
1028         OIC_LOG(ERROR, TAG, "CAErrorHandler, CAGenerateHandlerData failed!");
1029         coap_delete_pdu(pdu);
1030         return;
1031     }
1032
1033     cadata->errorInfo->result = result;
1034
1035     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
1036     coap_delete_pdu(pdu);
1037 #endif
1038
1039     OIC_LOG(DEBUG, TAG, "OUT");
1040     return;
1041 }