Added connectivity files for full code review
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "cainterfacecontroller.h"
30 #include "caprotocolmessage.h"
31 #include "uqueue.h"
32 #include "logger.h"
33 #include "config.h" /* for coap protocol */
34 #include "coap.h"
35 #include "uthreadpool.h" /* for thread pool */
36 #include "caqueueingthread.h"
37 #include "caretransmission.h"
38 #include "umutex.h"
39 #include "oic_malloc.h"
40 #include "caadapterutils.h"
41 #include "canetworkconfigurator.h"
42
43 #define TAG PCF("CA")
44 #define SINGLE_HANDLE
45
46 #define CA_MEMORY_ALLOC_CHECK(arg) { if (arg == NULL) {OIC_LOG_V(ERROR, TAG, "Out of memory"); \
47 goto memory_error_exit;} }
48
49 #define MAX_THREAD_POOL_SIZE    20
50
51 typedef enum
52 {
53     SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST
54 } CASendDataType_t;
55
56 typedef struct
57 {
58     CASendDataType_t type;
59     CARemoteEndpoint_t *remoteEndpoint;
60     CARequestInfo_t *requestInfo;
61     CAResponseInfo_t *responseInfo;
62     CAHeaderOption_t *options;
63     uint8_t numOptions;
64 } CAData_t;
65
66 // thread pool handle
67 static u_thread_pool_t g_threadPoolHandle = NULL;
68
69 // message handler main thread
70 static CAQueueingThread_t g_sendThread;
71 static CAQueueingThread_t g_receiveThread;
72
73 static CARetransmission_t g_retransmissionContext;
74
75 // handler field
76 static CARequestCallback g_requestHandler = NULL;
77 static CAResponseCallback g_responseHandler = NULL;
78
79 static void CATimeoutCallback(const CARemoteEndpoint_t *endpoint, const void *pdu, uint32_t size)
80 {
81     CARemoteEndpoint_t* ep = CACloneRemoteEndpoint(endpoint);
82     if (ep == NULL)
83     {
84         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
85         return;
86     }
87
88     CAResponseInfo_t* resInfo = (CAResponseInfo_t*) OICCalloc(1, sizeof(CAResponseInfo_t));
89
90     if (resInfo == NULL)
91     {
92         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
93         CADestroyRemoteEndpointInternal(ep);
94         return;
95     }
96
97     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
98     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
99
100     resInfo->result = CA_RETRANSMIT_TIMEOUT;
101     resInfo->info.type = type;
102     resInfo->info.messageId = messageId;
103
104     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
105     if (cadata == NULL)
106     {
107         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
108         CADestroyRemoteEndpointInternal(ep);
109         OICFree(resInfo);
110         return;
111     }
112
113     cadata->type = SEND_TYPE_UNICAST;
114     cadata->remoteEndpoint = ep;
115     cadata->requestInfo = NULL;
116     cadata->responseInfo = resInfo;
117
118     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
119 }
120
121 static void CADataDestroyer(void *data, uint32_t size)
122 {
123     CAData_t *cadata = (CAData_t *) data;
124
125     if (cadata == NULL)
126     {
127         return;
128     }
129
130     if (cadata->remoteEndpoint != NULL)
131     {
132         CADestroyRemoteEndpointInternal((CARemoteEndpoint_t *)cadata->remoteEndpoint);
133     }
134
135     if (cadata->requestInfo != NULL)
136     {
137         CADestroyRequestInfoInternal((CARequestInfo_t *)cadata->requestInfo);
138     }
139
140     if (cadata->responseInfo != NULL)
141     {
142         CADestroyResponseInfoInternal((CAResponseInfo_t *)cadata->responseInfo);
143     }
144
145     if (cadata->options != NULL)
146     {
147         OICFree(cadata->options);
148     }
149
150     OICFree(cadata);
151 }
152
153 static void CAReceiveThreadProcess(void *threadData)
154 {
155     // Currently not supported
156     // This will be enabled when RI supports multi threading
157 #ifndef SINGLE_HANDLE
158     CAData_t *data = (CAData_t *) threadData;
159
160     if (data == NULL)
161     {
162         OIC_LOG(DEBUG, TAG, "thread data error!!");
163         return;
164     }
165
166     // parse the data and call the callbacks.
167     // #1 parse the data
168     // #2 get endpoint
169     CARemoteEndpoint_t *rep = (CARemoteEndpoint_t *)(data->remoteEndpoint);
170
171     if (rep == NULL)
172         return;
173
174     if (data->requestInfo != NULL)
175     {
176         if (g_requestHandler)
177         {
178             g_requestHandler(rep, data->requestInfo);
179         }
180     }
181
182     if (data->responseInfo != NULL)
183     {
184         if (g_responseHandler)
185         {
186             g_responseHandler(rep, data->responseInfo);
187         }
188     }
189 #endif
190 }
191
192 static void CASendThreadProcess(void *threadData)
193 {
194     CAData_t *data = (CAData_t *) threadData;
195
196     if (data == NULL)
197     {
198         OIC_LOG(ERROR, TAG, "thread data error!!");
199         return;
200     }
201
202     if (NULL == data->remoteEndpoint)
203     {
204         OIC_LOG(DEBUG, TAG, "remoteEndpoint is null");
205         return;
206     }
207
208     CAResult_t res = CA_STATUS_FAILED;
209
210     CASendDataType_t type = data->type;
211
212     if (type == SEND_TYPE_UNICAST)
213     {
214         coap_pdu_t *pdu = NULL;
215
216         if (data->requestInfo != NULL)
217         {
218             OIC_LOG_V(DEBUG, TAG, "requestInfo is available..");
219
220             pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri,
221                                                data->requestInfo->method,
222                                                data->requestInfo->info);
223         }
224         else if (data->responseInfo != NULL)
225         {
226             OIC_LOG_V(DEBUG, TAG, "responseInfo is available..");
227
228             pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri,
229                                                data->responseInfo->result,
230                                                data->responseInfo->info);
231         }
232         else
233         {
234             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
235         }
236
237         // interface controller function call.
238         if (NULL != pdu)
239         {
240             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
241
242             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
243
244             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
245
246             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
247
248             OIC_LOG_V(DEBUG, TAG, "PDU Maker - token : %s", pdu->hdr->token);
249
250             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
251
252             // for retransmission
253             CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint, pdu->hdr,
254                                      pdu->length);
255
256             coap_delete_pdu(pdu);
257         }
258     }
259     else if (type == SEND_TYPE_MULTICAST)
260     {
261         coap_pdu_t *pdu = NULL;
262         CAInfo_t info = {};
263
264         info.options = data->options;
265         info.numOptions = data->numOptions;
266         info.token = data->requestInfo->info.token;
267         info.type = data->requestInfo->info.type;
268
269         pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri, CA_GET, info);
270
271         if (NULL != pdu)
272         {
273             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
274
275             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
276
277             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
278
279             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
280
281             OIC_LOG_V(DEBUG, TAG, "PDU Maker - token : %s", pdu->hdr->token);
282
283             res = CASendMulticastData(pdu->hdr, pdu->length);
284             coap_delete_pdu(pdu);
285         }
286     }
287
288     OIC_LOG_V(DEBUG, TAG, " Result :%d", res);
289 }
290
291 static void CAReceivedPacketCallback(CARemoteEndpoint_t *endpoint, void *data,
292                                      uint32_t dataLen)
293 {
294     OIC_LOG(DEBUG, TAG, "receivedPacketCallback in message handler!!");
295
296     if (NULL == data)
297     {
298         OIC_LOG(DEBUG, TAG, "received data is null");
299         return;
300     }
301
302     coap_pdu_t *pdu;
303     uint32_t code = CA_NOT_FOUND;
304
305     OIC_LOG_V(DEBUG, TAG, "data : %s", data);
306     pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code);
307     OICFree(data);
308
309     if(NULL == pdu)
310     {
311         OIC_LOG(DEBUG, TAG, "pdu is null");
312         return;
313     }
314
315     char uri[CA_MAX_URI_LENGTH] = { 0, };
316     uint32_t bufLen = CA_MAX_URI_LENGTH;
317
318     if (code == CA_GET || code == CA_POST || code == CA_PUT || code == CA_DELETE)
319     {
320         CARequestInfo_t *ReqInfo;
321         ReqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t));
322         if (ReqInfo == NULL)
323         {
324             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
325             coap_delete_pdu(pdu);
326             CAAdapterFreeRemoteEndpoint(endpoint);
327             return;
328         }
329         CAGetRequestInfoFromPdu(pdu, ReqInfo, uri, bufLen);
330
331         if (NULL != ReqInfo->info.options)
332         {
333             uint32_t i;
334             for (i = 0; i < ReqInfo->info.numOptions; i++)
335             {
336                 OIC_LOG_V(DEBUG, TAG, "Request- optionID: %d", ReqInfo->info.options[i].optionID);
337
338                 OIC_LOG_V(DEBUG, TAG, "Request- list: %s", ReqInfo->info.options[i].optionData);
339             }
340         }
341
342         if (NULL != ReqInfo->info.payload)
343         {
344             OIC_LOG_V(DEBUG, TAG, "Request- payload: %s", ReqInfo->info.payload);
345         }
346         OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method);
347         if (NULL != ReqInfo->info.token)
348         {
349             OIC_LOG_V(DEBUG, TAG, "Request- token : %s", ReqInfo->info.token);
350         }
351
352         if (NULL != endpoint)
353         {
354             endpoint->resourceUri = (char *) OICCalloc(bufLen + 1, sizeof(char));
355             if (endpoint->resourceUri == NULL)
356             {
357                 OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
358                 OICFree(ReqInfo);
359                 coap_delete_pdu(pdu);
360                 CAAdapterFreeRemoteEndpoint(endpoint);
361                 return;
362             }
363             memcpy(endpoint->resourceUri, uri, bufLen);
364             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
365         }
366         // store the data at queue.
367         CAData_t *cadata = NULL;
368         cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
369         if (cadata == NULL)
370         {
371             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
372             if (endpoint != NULL && endpoint->resourceUri != NULL)
373                 OICFree(endpoint->resourceUri);
374             OICFree(ReqInfo);
375             coap_delete_pdu(pdu);
376             CAAdapterFreeRemoteEndpoint(endpoint);
377             return;
378         }
379
380         cadata->type = SEND_TYPE_UNICAST;
381         cadata->remoteEndpoint = endpoint;
382         cadata->requestInfo = ReqInfo;
383         cadata->responseInfo = NULL;
384         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
385     }
386     else
387     {
388         CAResponseInfo_t *ResInfo;
389         ResInfo = (CAResponseInfo_t *) OICCalloc(1, sizeof(CAResponseInfo_t));
390         if (ResInfo == NULL)
391         {
392             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
393             coap_delete_pdu(pdu);
394             CAAdapterFreeRemoteEndpoint(endpoint);
395             return;
396         }
397         CAGetResponseInfoFromPdu(pdu, ResInfo, uri, bufLen);
398
399         if (NULL != ResInfo->info.options)
400         {
401             uint32_t i;
402             for (i = 0; i < ResInfo->info.numOptions; i++)
403             {
404                 OIC_LOG_V(DEBUG, TAG, "Response- optionID: %d", ResInfo->info.options[i].optionID);
405
406                 OIC_LOG_V(DEBUG, TAG, "Response- list: %s", ResInfo->info.options[i].optionData);
407             }
408             if (NULL != ResInfo->info.payload)
409             {
410                 OIC_LOG_V(DEBUG, TAG, "Response- payload: %s", ResInfo->info.payload);
411             }
412             OIC_LOG_V(DEBUG, TAG, "Response- code: %d", ResInfo->result);
413         }
414
415         if (NULL != endpoint)
416         {
417             endpoint->resourceUri = (char *) OICCalloc(bufLen + 1, sizeof(char));
418             if (endpoint->resourceUri == NULL)
419             {
420                 OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
421                 OICFree(ResInfo);
422                 coap_delete_pdu(pdu);
423                 CAAdapterFreeRemoteEndpoint(endpoint);
424                 return;
425             }
426             memcpy(endpoint->resourceUri, uri, bufLen);
427             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
428         }
429
430         // store the data at queue.
431         CAData_t *cadata = NULL;
432         cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
433         if (cadata == NULL)
434         {
435             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
436             if (endpoint != NULL && endpoint->resourceUri != NULL)
437                 OICFree(endpoint->resourceUri);
438             OICFree(ResInfo);
439             coap_delete_pdu(pdu);
440             CAAdapterFreeRemoteEndpoint(endpoint);
441             return;
442         }
443
444         cadata->type = SEND_TYPE_UNICAST;
445         cadata->remoteEndpoint = endpoint;
446         cadata->requestInfo = NULL;
447         cadata->responseInfo = ResInfo;
448
449         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
450
451         // for retransmission
452         CARetransmissionReceivedData(&g_retransmissionContext, endpoint, pdu->hdr, pdu->length);
453     }
454
455     if(pdu)
456     {
457         coap_delete_pdu(pdu);
458     }
459 }
460
461 static void CANetworkChangedCallback(CALocalConnectivity_t *info, CANetworkStatus_t status)
462 {
463     OIC_LOG(DEBUG, TAG, "networkChangeCallback in message handler!!");
464
465     OIC_LOG_V(DEBUG, TAG, "Changed Network Status: %d", status);
466 }
467
468 void CAHandleRequestResponseCallbacks()
469 {
470     OIC_LOG_V(DEBUG, TAG, "CAHandleRequestResponseCallbacks IN");
471
472     // parse the data and call the callbacks.
473     // #1 parse the data
474     // #2 get endpoint
475
476     u_mutex_lock(g_receiveThread.threadMutex);
477
478     u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
479
480     u_mutex_unlock(g_receiveThread.threadMutex);
481
482     if (item == NULL)
483         return;
484
485     // get values
486     void *msg = item->msg;
487
488     if (msg == NULL)
489         return;
490
491     // get endpoint
492     CAData_t *td = (CAData_t *) msg;
493     CARemoteEndpoint_t *rep = td->remoteEndpoint;
494
495     if (rep == NULL)
496         return;
497
498     if (td->requestInfo != NULL)
499     {
500         if (g_requestHandler)
501         {
502             g_requestHandler(rep, td->requestInfo);
503         }
504
505         OICFree(td->requestInfo->info.options);
506         OICFree(td->requestInfo->info.payload);
507         OICFree(td->requestInfo->info.token);
508         OICFree(td->requestInfo);
509     }
510
511     if (td->responseInfo != NULL)
512     {
513         if (g_responseHandler)
514         {
515             g_responseHandler(rep, td->responseInfo);
516         }
517
518         OICFree(td->responseInfo->info.options);
519         OICFree(td->responseInfo->info.payload);
520         OICFree(td->responseInfo->info.token);
521         OICFree(td->responseInfo);
522     }
523
524     if (NULL != rep->resourceUri)
525     {
526         OICFree(rep->resourceUri);
527     }
528
529     OICFree(rep);
530     OIC_LOG_V(DEBUG, TAG, "CAHandleRequestResponseCallbacks OUT");
531 }
532
533 CAResult_t CADetachRequestMessage(const CARemoteEndpoint_t *object,
534                                   const CARequestInfo_t *request)
535 {
536     OIC_LOG_V(DEBUG, TAG, "CADetachRequestMessage");
537
538     if (object == NULL || request == NULL)
539     {
540         return CA_STATUS_FAILED;
541     }
542
543     CARemoteEndpoint_t *remoteEndpoint = NULL;
544     CARequestInfo_t *requestInfo = NULL;
545
546     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
547     CA_MEMORY_ALLOC_CHECK(data);
548
549     // clone remote endpoint
550
551     remoteEndpoint = CACloneRemoteEndpoint(object);
552     CA_MEMORY_ALLOC_CHECK(remoteEndpoint);
553
554     // clone request info
555     requestInfo = CACloneRequestInfo(request);
556     CA_MEMORY_ALLOC_CHECK(requestInfo);
557
558     // save data
559     data->type = SEND_TYPE_UNICAST;
560     data->remoteEndpoint = remoteEndpoint;
561     data->requestInfo = requestInfo;
562     data->responseInfo = NULL;
563
564     // add thread
565     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
566
567     return CA_STATUS_OK;
568
569     // memory error label.
570 memory_error_exit:
571
572     CADestroyRemoteEndpointInternal(remoteEndpoint);
573
574     CADestroyRequestInfoInternal(requestInfo);
575
576     if (data != NULL)
577     {
578         OICFree(data);
579     }
580
581     return CA_MEMORY_ALLOC_FAILED;
582 }
583
584 CAResult_t CADetachRequestToAllMessage(const CAGroupEndpoint_t *object,
585                                        const CARequestInfo_t *request)
586 {
587     // ToDo
588     OIC_LOG_V(DEBUG, TAG, "CADetachRequestToAllMessage");
589
590
591     if (object == NULL || request == NULL)
592     {
593         return CA_STATUS_FAILED;
594     }
595
596     CARemoteEndpoint_t *remoteEndpoint = NULL;
597     CARequestInfo_t *requestInfo = NULL;
598
599     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
600     CA_MEMORY_ALLOC_CHECK(data);
601
602     CAAddress_t addr;
603     memset(&addr, 0, sizeof(CAAddress_t));
604     remoteEndpoint = CACreateRemoteEndpointInternal(object->resourceUri, addr,
605                                          object->connectivityType);
606
607     // clone request info
608     requestInfo = CACloneRequestInfo(request);
609     CA_MEMORY_ALLOC_CHECK(requestInfo);
610
611     // save data
612     data->type = SEND_TYPE_MULTICAST;
613     data->remoteEndpoint = remoteEndpoint;
614     data->requestInfo = requestInfo;
615     data->responseInfo = NULL;
616
617     // add thread
618     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
619
620     return CA_STATUS_OK;
621
622     // memory error label.
623 memory_error_exit:
624
625     CADestroyRemoteEndpointInternal(remoteEndpoint);
626
627     CADestroyRequestInfoInternal(requestInfo);
628
629     if (data != NULL)
630     {
631         OICFree(data);
632     }
633
634     return CA_MEMORY_ALLOC_FAILED;
635 }
636
637 CAResult_t CADetachResponseMessage(const CARemoteEndpoint_t *object,
638                                    const CAResponseInfo_t *response)
639 {
640     OIC_LOG_V(DEBUG, TAG, "CADetachResponseMessage");
641
642     if (object == NULL || response == NULL)
643     {
644         return CA_STATUS_FAILED;
645     }
646
647     CARemoteEndpoint_t *remoteEndpoint = NULL;
648     CAResponseInfo_t *responseInfo = NULL;
649
650     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
651     CA_MEMORY_ALLOC_CHECK(data);
652
653     // clone remote endpoint
654     remoteEndpoint = CACloneRemoteEndpoint(object);
655     CA_MEMORY_ALLOC_CHECK(remoteEndpoint);
656
657     // clone response info
658     responseInfo = CACloneResponseInfo(response);
659     CA_MEMORY_ALLOC_CHECK(responseInfo);
660
661     // save data
662     data->type = SEND_TYPE_UNICAST;
663     data->remoteEndpoint = remoteEndpoint;
664     data->requestInfo = NULL;
665     data->responseInfo = responseInfo;
666
667     // add thread
668     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
669
670     return CA_STATUS_OK;
671
672     // memory error label.
673 memory_error_exit:
674
675     CADestroyRemoteEndpointInternal(remoteEndpoint);
676
677     CADestroyResponseInfoInternal(responseInfo);
678
679     if (data != NULL)
680     {
681         OICFree(data);
682     }
683
684     return CA_MEMORY_ALLOC_FAILED;
685 }
686
687 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
688                                       const CAHeaderOption_t *options,const uint8_t numOptions)
689 {
690     if (resourceUri == NULL)
691     {
692         return CA_STATUS_FAILED;
693     }
694     CARequestInfo_t *ReqInfo = NULL;
695     CAToken_t tempToken = NULL;
696     CARemoteEndpoint_t *remoteEndpoint = NULL;
697
698     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
699     CA_MEMORY_ALLOC_CHECK(data);
700
701     CAAddress_t addr = {};
702     remoteEndpoint = CACreateRemoteEndpointInternal(resourceUri, addr,
703                                          CA_ETHERNET | CA_WIFI | CA_EDR | CA_LE);
704
705     // create request info
706     ReqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t));
707     CA_MEMORY_ALLOC_CHECK(ReqInfo);
708
709     // copy token value
710     if (token != NULL)
711     {
712         int32_t len = strlen(token);
713         tempToken = (char *) OICCalloc((len + 1), sizeof(char));
714         CA_MEMORY_ALLOC_CHECK(tempToken);
715         strncpy(tempToken, token, len);
716     }
717
718     // save request info data
719     ReqInfo->method = CA_GET;
720     ReqInfo->info.token = tempToken;
721     ReqInfo->info.type = CA_MSG_NONCONFIRM;
722     // save data
723     data->type = SEND_TYPE_MULTICAST;
724     data->remoteEndpoint = remoteEndpoint;
725     data->requestInfo = ReqInfo;
726
727     data->responseInfo = NULL;
728     data->options = NULL;
729     data->numOptions = 0;
730
731     if (options != NULL && numOptions > 0)
732     {
733         // copy data
734         CAHeaderOption_t *temp = (CAHeaderOption_t *) OICCalloc(numOptions,
735                                      sizeof(CAHeaderOption_t));
736         CA_MEMORY_ALLOC_CHECK(temp);
737
738         memcpy(temp, options, sizeof(CAHeaderOption_t) * numOptions);
739
740         data->options = temp;
741         data->numOptions = numOptions;
742     }
743
744     // add thread
745     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
746
747     return CA_STATUS_OK;
748
749     // memory error label.
750 memory_error_exit:
751
752     CADestroyRemoteEndpointInternal(remoteEndpoint);
753
754     if (tempToken != NULL)
755     {
756         OICFree(tempToken);
757     }
758
759     if (ReqInfo != NULL)
760     {
761         OICFree(ReqInfo);
762     }
763
764     if (data != NULL)
765     {
766         OICFree(data);
767     }
768
769     return CA_MEMORY_ALLOC_FAILED;
770 }
771
772 void CASetRequestResponseCallbacks(CARequestCallback ReqHandler,
773                                    CAResponseCallback RespHandler)
774 {
775     OIC_LOG_V(DEBUG, TAG, "set request, response handler callback.");
776
777     g_requestHandler = ReqHandler;
778     g_responseHandler = RespHandler;
779 }
780
781 CAResult_t CAInitializeMessageHandler()
782 {
783     OIC_LOG(DEBUG, TAG, "CAInitializeMessageHandler - Entry");
784     CASetPacketReceivedCallback(CAReceivedPacketCallback);
785
786     CASetNetworkChangeCallback(CANetworkChangedCallback);
787
788     // create thread pool
789     CAResult_t res;
790     res = u_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle);
791
792     if (res != CA_STATUS_OK)
793     {
794         OIC_LOG_V(ERROR, TAG, "thread pool initialize error.");
795         return res;
796     }
797
798     // send thread initialize
799     CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle, CASendThreadProcess,
800             CADataDestroyer);
801
802     // start send thread
803     res = CAQueueingThreadStart(&g_sendThread);
804
805     if (res != CA_STATUS_OK)
806     {
807         OIC_LOG_V(ERROR, TAG, "thread start error(send thread).");
808         u_thread_pool_free(g_threadPoolHandle);
809         g_threadPoolHandle = NULL;
810         return res;
811     }
812
813     // receive thread initialize
814     CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle, CAReceiveThreadProcess,
815             CADataDestroyer);
816
817 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
818     // start receive thread
819     res = CAQueueingThreadStart(&gReceiveThread);
820
821     if (res != CA_STATUS_OK)
822     {
823         OIC_LOG_V(DEBUG, TAG, "thread start error(receive thread).");
824         return res;
825     }
826 #endif
827
828     // retransmission initialize
829     CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData,
830             CATimeoutCallback, NULL);
831
832     // start retransmission
833     res = CARetransmissionStart(&g_retransmissionContext);
834
835     if (res != CA_STATUS_OK)
836     {
837         OIC_LOG_V(ERROR, TAG, "thread start error(retransmission thread).");
838         return res;
839     }
840
841     // initialize interface adapters by controller
842     CAInitializeAdapters(g_threadPoolHandle);
843
844     return CA_STATUS_OK;
845 }
846
847 void CATerminateMessageHandler()
848 {
849     uint8_t i = 0;
850     CAConnectivityType_t connType;
851     u_arraylist_t *list = CAGetSelectedNetworkList();
852     uint32_t length = u_arraylist_length(list);
853
854     for (i = 0; i < length; i++)
855     {
856         void* ptrType = u_arraylist_get(list, i);
857
858         if (ptrType == NULL)
859         {
860             continue;
861         }
862
863         connType = *(CAConnectivityType_t *) ptrType;
864         CAStopAdapter(connType);
865     }
866
867     // stop retransmission
868     if (g_retransmissionContext.threadMutex != NULL)
869     {
870         CARetransmissionStop(&g_retransmissionContext);
871     }
872
873     // stop thread
874     // delete thread data
875     if (g_sendThread.threadMutex != NULL)
876     {
877         CAQueueingThreadStop(&g_sendThread);
878     }
879
880     // stop thread
881     // delete thread data
882     if (g_receiveThread.threadMutex != NULL)
883     {
884 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
885         CAQueueingThreadStop(&gReceiveThread);
886 #endif
887     }
888
889     // destroy thread pool
890     if (g_threadPoolHandle != NULL)
891     {
892         u_thread_pool_free(g_threadPoolHandle);
893         g_threadPoolHandle = NULL;
894     }
895
896     CARetransmissionDestroy(&g_retransmissionContext);
897     CAQueueingThreadDestroy(&g_sendThread);
898     CAQueueingThreadDestroy(&g_receiveThread);
899
900     // terminate interface adapters by controller
901     CATerminateAdapters();
902
903     OIC_LOG_V(DEBUG, TAG, "message handler termination is complete!");
904 }
905