7e9de748a65a8da7ec6e93207bbbd2615e3b961f
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "cainterfacecontroller.h"
30 #include "caprotocolmessage.h"
31 #include "uqueue.h"
32 #include "logger.h"
33 #include "config.h" /* for coap protocol */
34 #include "coap.h"
35 #include "uthreadpool.h" /* for thread pool */
36 #include "caqueueingthread.h"
37 #include "caretransmission.h"
38 #include "umutex.h"
39 #include "oic_malloc.h"
40 #include "caadapterutils.h"
41 #include "canetworkconfigurator.h"
42
43 #define TAG PCF("CA")
44 #define SINGLE_HANDLE
45
46 #define CA_MEMORY_ALLOC_CHECK(arg) { if (arg == NULL) {OIC_LOG(ERROR, TAG, "Out of memory"); \
47 goto memory_error_exit;} }
48
49 #define MAX_THREAD_POOL_SIZE    20
50
51 typedef enum
52 {
53     SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST
54 } CASendDataType_t;
55
56 typedef struct
57 {
58     CASendDataType_t type;
59     CARemoteEndpoint_t *remoteEndpoint;
60     CARequestInfo_t *requestInfo;
61     CAResponseInfo_t *responseInfo;
62     CAHeaderOption_t *options;
63     uint8_t numOptions;
64 } CAData_t;
65
66 // thread pool handle
67 static u_thread_pool_t g_threadPoolHandle = NULL;
68
69 // message handler main thread
70 static CAQueueingThread_t g_sendThread;
71 static CAQueueingThread_t g_receiveThread;
72
73 static CARetransmission_t g_retransmissionContext;
74
75 // handler field
76 static CARequestCallback g_requestHandler = NULL;
77 static CAResponseCallback g_responseHandler = NULL;
78
79 static void CATimeoutCallback(const CARemoteEndpoint_t *endpoint, const void *pdu, uint32_t size)
80 {
81     CARemoteEndpoint_t* ep = CACloneRemoteEndpoint(endpoint);
82     if (ep == NULL)
83     {
84         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
85         return;
86     }
87
88     CAResponseInfo_t* resInfo = (CAResponseInfo_t*) OICCalloc(1, sizeof(CAResponseInfo_t));
89
90     if (resInfo == NULL)
91     {
92         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
93         CADestroyRemoteEndpointInternal(ep);
94         return;
95     }
96
97     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
98     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
99
100     resInfo->result = CA_RETRANSMIT_TIMEOUT;
101     resInfo->info.type = type;
102     resInfo->info.messageId = messageId;
103
104     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
105     if (cadata == NULL)
106     {
107         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
108         CADestroyRemoteEndpointInternal(ep);
109         OICFree(resInfo);
110         return;
111     }
112
113     cadata->type = SEND_TYPE_UNICAST;
114     cadata->remoteEndpoint = ep;
115     cadata->requestInfo = NULL;
116     cadata->responseInfo = resInfo;
117
118     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
119 }
120
121 static void CADataDestroyer(void *data, uint32_t size)
122 {
123     CAData_t *cadata = (CAData_t *) data;
124
125     if (cadata == NULL)
126     {
127         return;
128     }
129
130     if (cadata->remoteEndpoint != NULL)
131     {
132         CADestroyRemoteEndpointInternal((CARemoteEndpoint_t *)cadata->remoteEndpoint);
133     }
134
135     if (cadata->requestInfo != NULL)
136     {
137         CADestroyRequestInfoInternal((CARequestInfo_t *)cadata->requestInfo);
138     }
139
140     if (cadata->responseInfo != NULL)
141     {
142         CADestroyResponseInfoInternal((CAResponseInfo_t *)cadata->responseInfo);
143     }
144
145     if (cadata->options != NULL)
146     {
147         OICFree(cadata->options);
148     }
149
150     OICFree(cadata);
151 }
152
153 static void CAReceiveThreadProcess(void *threadData)
154 {
155     // Currently not supported
156     // This will be enabled when RI supports multi threading
157 #ifndef SINGLE_HANDLE
158     CAData_t *data = (CAData_t *) threadData;
159
160     if (data == NULL)
161     {
162         OIC_LOG(ERROR, TAG, "thread data error!!");
163         return;
164     }
165
166     // parse the data and call the callbacks.
167     // #1 parse the data
168     // #2 get endpoint
169     CARemoteEndpoint_t *rep = (CARemoteEndpoint_t *)(data->remoteEndpoint);
170
171     if (rep == NULL)
172         return;
173
174     if (data->requestInfo != NULL)
175     {
176         if (g_requestHandler)
177         {
178             g_requestHandler(rep, data->requestInfo);
179         }
180     }
181
182     if (data->responseInfo != NULL)
183     {
184         if (g_responseHandler)
185         {
186             g_responseHandler(rep, data->responseInfo);
187         }
188     }
189 #endif
190 }
191
192 static void CASendThreadProcess(void *threadData)
193 {
194     CAData_t *data = (CAData_t *) threadData;
195
196     if (data == NULL)
197     {
198         OIC_LOG(ERROR, TAG, "thread data error!!");
199         return;
200     }
201
202     if (NULL == data->remoteEndpoint)
203     {
204         OIC_LOG(DEBUG, TAG, "remoteEndpoint is null");
205         return;
206     }
207
208     CAResult_t res = CA_STATUS_FAILED;
209
210     CASendDataType_t type = data->type;
211
212     if (type == SEND_TYPE_UNICAST)
213     {
214         coap_pdu_t *pdu = NULL;
215
216         if (data->requestInfo != NULL)
217         {
218             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
219
220             pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri,
221                                                data->requestInfo->method,
222                                                data->requestInfo->info);
223         }
224         else if (data->responseInfo != NULL)
225         {
226             OIC_LOG(DEBUG, TAG, "responseInfo is available..");
227
228             pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri,
229                                                data->responseInfo->result,
230                                                data->responseInfo->info);
231         }
232         else
233         {
234             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
235         }
236
237         // interface controller function call.
238         if (NULL != pdu)
239         {
240             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
241
242             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
243
244             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
245
246             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
247
248             OIC_LOG(DEBUG, TAG, "PDU Maker - token :");
249
250             OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->token, pdu->hdr->token_length);
251
252             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
253
254             // for retransmission
255             CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint, pdu->hdr,
256                                      pdu->length);
257
258             coap_delete_pdu(pdu);
259         }
260     }
261     else if (type == SEND_TYPE_MULTICAST)
262     {
263         coap_pdu_t *pdu = NULL;
264         CAInfo_t info = {};
265
266         info.options = data->options;
267         info.numOptions = data->numOptions;
268         info.token = data->requestInfo->info.token;
269         info.type = data->requestInfo->info.type;
270
271         pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri, CA_GET, info);
272
273         if (NULL != pdu)
274         {
275             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
276
277             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
278
279             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
280
281             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
282
283             OIC_LOG(DEBUG, TAG, "PDU Maker - token");
284
285             OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->token, pdu->hdr->token_length);
286
287             res = CASendMulticastData(pdu->hdr, pdu->length);
288             coap_delete_pdu(pdu);
289         }
290     }
291
292     OIC_LOG_V(DEBUG, TAG, " Result :%d", res);
293 }
294
295 static void CAReceivedPacketCallback(CARemoteEndpoint_t *endpoint, void *data,
296                                      uint32_t dataLen)
297 {
298     OIC_LOG(DEBUG, TAG, "receivedPacketCallback in message handler!!");
299
300     if (NULL == data)
301     {
302         OIC_LOG(DEBUG, TAG, "received data is null");
303         return;
304     }
305
306     coap_pdu_t *pdu;
307     uint32_t code = CA_NOT_FOUND;
308
309     OIC_LOG_V(DEBUG, TAG, "data : %s", data);
310     pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code);
311     OICFree(data);
312
313     if(NULL == pdu)
314     {
315         OIC_LOG(DEBUG, TAG, "pdu is null");
316         return;
317     }
318
319     char uri[CA_MAX_URI_LENGTH] = { 0, };
320     uint32_t bufLen = CA_MAX_URI_LENGTH;
321
322     if (code == CA_GET || code == CA_POST || code == CA_PUT || code == CA_DELETE)
323     {
324         CARequestInfo_t *ReqInfo;
325         ReqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t));
326         if (ReqInfo == NULL)
327         {
328             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
329             coap_delete_pdu(pdu);
330             CAAdapterFreeRemoteEndpoint(endpoint);
331             return;
332         }
333         CAGetRequestInfoFromPdu(pdu, ReqInfo, uri, bufLen);
334
335         if (NULL != ReqInfo->info.options)
336         {
337             uint32_t i;
338             for (i = 0; i < ReqInfo->info.numOptions; i++)
339             {
340                 OIC_LOG_V(DEBUG, TAG, "Request- optionID: %d", ReqInfo->info.options[i].optionID);
341
342                 OIC_LOG_V(DEBUG, TAG, "Request- list: %s", ReqInfo->info.options[i].optionData);
343             }
344         }
345
346         if (NULL != ReqInfo->info.payload)
347         {
348             OIC_LOG_V(DEBUG, TAG, "Request- payload: %s", ReqInfo->info.payload);
349         }
350         OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method);
351         if (NULL != ReqInfo->info.token)
352         {
353             OIC_LOG(DEBUG, TAG, "Request- token:");
354             OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) ReqInfo->info.token, CA_MAX_TOKEN_LEN);
355         }
356
357         if (NULL != endpoint)
358         {
359             endpoint->resourceUri = (char *) OICCalloc(bufLen + 1, sizeof(char));
360             if (endpoint->resourceUri == NULL)
361             {
362                 OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
363                 OICFree(ReqInfo);
364                 coap_delete_pdu(pdu);
365                 CAAdapterFreeRemoteEndpoint(endpoint);
366                 return;
367             }
368             memcpy(endpoint->resourceUri, uri, bufLen);
369             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
370         }
371         // store the data at queue.
372         CAData_t *cadata = NULL;
373         cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
374         if (cadata == NULL)
375         {
376             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
377             if (endpoint != NULL && endpoint->resourceUri != NULL)
378                 OICFree(endpoint->resourceUri);
379             OICFree(ReqInfo);
380             coap_delete_pdu(pdu);
381             CAAdapterFreeRemoteEndpoint(endpoint);
382             return;
383         }
384
385         cadata->type = SEND_TYPE_UNICAST;
386         cadata->remoteEndpoint = endpoint;
387         cadata->requestInfo = ReqInfo;
388         cadata->responseInfo = NULL;
389         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
390     }
391     else
392     {
393         CAResponseInfo_t *ResInfo;
394         ResInfo = (CAResponseInfo_t *) OICCalloc(1, sizeof(CAResponseInfo_t));
395         if (ResInfo == NULL)
396         {
397             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
398             coap_delete_pdu(pdu);
399             CAAdapterFreeRemoteEndpoint(endpoint);
400             return;
401         }
402         CAGetResponseInfoFromPdu(pdu, ResInfo, uri, bufLen);
403
404         if (NULL != ResInfo->info.options)
405         {
406             uint32_t i;
407             for (i = 0; i < ResInfo->info.numOptions; i++)
408             {
409                 OIC_LOG_V(DEBUG, TAG, "Response- optionID: %d", ResInfo->info.options[i].optionID);
410
411                 OIC_LOG_V(DEBUG, TAG, "Response- list: %s", ResInfo->info.options[i].optionData);
412             }
413             if (NULL != ResInfo->info.payload)
414             {
415                 OIC_LOG_V(DEBUG, TAG, "Response- payload: %s", ResInfo->info.payload);
416             }
417             OIC_LOG_V(DEBUG, TAG, "Response- code: %d", ResInfo->result);
418         }
419
420         if (NULL != endpoint)
421         {
422             endpoint->resourceUri = (char *) OICCalloc(bufLen + 1, sizeof(char));
423             if (endpoint->resourceUri == NULL)
424             {
425                 OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
426                 OICFree(ResInfo);
427                 coap_delete_pdu(pdu);
428                 CAAdapterFreeRemoteEndpoint(endpoint);
429                 return;
430             }
431             memcpy(endpoint->resourceUri, uri, bufLen);
432             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
433         }
434
435         // store the data at queue.
436         CAData_t *cadata = NULL;
437         cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
438         if (cadata == NULL)
439         {
440             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
441             if (endpoint != NULL && endpoint->resourceUri != NULL)
442                 OICFree(endpoint->resourceUri);
443             OICFree(ResInfo);
444             coap_delete_pdu(pdu);
445             CAAdapterFreeRemoteEndpoint(endpoint);
446             return;
447         }
448
449         cadata->type = SEND_TYPE_UNICAST;
450         cadata->remoteEndpoint = endpoint;
451         cadata->requestInfo = NULL;
452         cadata->responseInfo = ResInfo;
453
454         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
455
456         // for retransmission
457         CARetransmissionReceivedData(&g_retransmissionContext, endpoint, pdu->hdr, pdu->length);
458     }
459
460     if(pdu)
461     {
462         coap_delete_pdu(pdu);
463     }
464 }
465
466 static void CANetworkChangedCallback(CALocalConnectivity_t *info, CANetworkStatus_t status)
467 {
468     OIC_LOG(DEBUG, TAG, "networkChangeCallback in message handler!!");
469
470     OIC_LOG_V(DEBUG, TAG, "Changed Network Status: %d", status);
471 }
472
473 void CAHandleRequestResponseCallbacks()
474 {
475     OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks IN");
476
477     // parse the data and call the callbacks.
478     // #1 parse the data
479     // #2 get endpoint
480
481     u_mutex_lock(g_receiveThread.threadMutex);
482
483     u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
484
485     u_mutex_unlock(g_receiveThread.threadMutex);
486
487     if (item == NULL)
488         return;
489
490     // get values
491     void *msg = item->msg;
492
493     if (msg == NULL)
494         return;
495
496     // get endpoint
497     CAData_t *td = (CAData_t *) msg;
498     CARemoteEndpoint_t *rep = td->remoteEndpoint;
499
500     if (rep == NULL)
501         return;
502
503     if (td->requestInfo != NULL)
504     {
505         if (g_requestHandler)
506         {
507             g_requestHandler(rep, td->requestInfo);
508         }
509
510         OICFree(td->requestInfo->info.options);
511         OICFree(td->requestInfo->info.payload);
512         OICFree(td->requestInfo->info.token);
513         OICFree(td->requestInfo);
514     }
515
516     if (td->responseInfo != NULL)
517     {
518         if (g_responseHandler)
519         {
520             g_responseHandler(rep, td->responseInfo);
521         }
522
523         OICFree(td->responseInfo->info.options);
524         OICFree(td->responseInfo->info.payload);
525         OICFree(td->responseInfo->info.token);
526         OICFree(td->responseInfo);
527     }
528
529     if (NULL != rep->resourceUri)
530     {
531         OICFree(rep->resourceUri);
532     }
533
534     OICFree(rep);
535     OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks OUT");
536 }
537
538 CAResult_t CADetachRequestMessage(const CARemoteEndpoint_t *object,
539                                   const CARequestInfo_t *request)
540 {
541     OIC_LOG(DEBUG, TAG, "CADetachRequestMessage");
542
543     if (object == NULL || request == NULL)
544     {
545         return CA_STATUS_FAILED;
546     }
547
548     CARemoteEndpoint_t *remoteEndpoint = NULL;
549     CARequestInfo_t *requestInfo = NULL;
550
551     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
552     CA_MEMORY_ALLOC_CHECK(data);
553
554     // clone remote endpoint
555
556     remoteEndpoint = CACloneRemoteEndpoint(object);
557     CA_MEMORY_ALLOC_CHECK(remoteEndpoint);
558
559     // clone request info
560     requestInfo = CACloneRequestInfo(request);
561     CA_MEMORY_ALLOC_CHECK(requestInfo);
562
563     // save data
564     data->type = SEND_TYPE_UNICAST;
565     data->remoteEndpoint = remoteEndpoint;
566     data->requestInfo = requestInfo;
567     data->responseInfo = NULL;
568
569     // add thread
570     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
571
572     return CA_STATUS_OK;
573
574     // memory error label.
575 memory_error_exit:
576
577     CADestroyRemoteEndpointInternal(remoteEndpoint);
578
579     CADestroyRequestInfoInternal(requestInfo);
580
581     if (data != NULL)
582     {
583         OICFree(data);
584     }
585
586     return CA_MEMORY_ALLOC_FAILED;
587 }
588
589 CAResult_t CADetachRequestToAllMessage(const CAGroupEndpoint_t *object,
590                                        const CARequestInfo_t *request)
591 {
592     // ToDo
593     OIC_LOG(DEBUG, TAG, "CADetachRequestToAllMessage");
594
595
596     if (object == NULL || request == NULL)
597     {
598         return CA_STATUS_FAILED;
599     }
600
601     CARemoteEndpoint_t *remoteEndpoint = NULL;
602     CARequestInfo_t *requestInfo = NULL;
603
604     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
605     CA_MEMORY_ALLOC_CHECK(data);
606
607     CAAddress_t addr;
608     memset(&addr, 0, sizeof(CAAddress_t));
609     remoteEndpoint = CACreateRemoteEndpointInternal(object->resourceUri, addr,
610                                          object->connectivityType);
611
612     // clone request info
613     requestInfo = CACloneRequestInfo(request);
614     CA_MEMORY_ALLOC_CHECK(requestInfo);
615
616     // save data
617     data->type = SEND_TYPE_MULTICAST;
618     data->remoteEndpoint = remoteEndpoint;
619     data->requestInfo = requestInfo;
620     data->responseInfo = NULL;
621
622     // add thread
623     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
624
625     return CA_STATUS_OK;
626
627     // memory error label.
628 memory_error_exit:
629
630     CADestroyRemoteEndpointInternal(remoteEndpoint);
631
632     CADestroyRequestInfoInternal(requestInfo);
633
634     if (data != NULL)
635     {
636         OICFree(data);
637     }
638
639     return CA_MEMORY_ALLOC_FAILED;
640 }
641
642 CAResult_t CADetachResponseMessage(const CARemoteEndpoint_t *object,
643                                    const CAResponseInfo_t *response)
644 {
645     OIC_LOG(DEBUG, TAG, "CADetachResponseMessage");
646
647     if (object == NULL || response == NULL)
648     {
649         return CA_STATUS_FAILED;
650     }
651
652     CARemoteEndpoint_t *remoteEndpoint = NULL;
653     CAResponseInfo_t *responseInfo = NULL;
654
655     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
656     CA_MEMORY_ALLOC_CHECK(data);
657
658     // clone remote endpoint
659     remoteEndpoint = CACloneRemoteEndpoint(object);
660     CA_MEMORY_ALLOC_CHECK(remoteEndpoint);
661
662     // clone response info
663     responseInfo = CACloneResponseInfo(response);
664     CA_MEMORY_ALLOC_CHECK(responseInfo);
665
666     // save data
667     data->type = SEND_TYPE_UNICAST;
668     data->remoteEndpoint = remoteEndpoint;
669     data->requestInfo = NULL;
670     data->responseInfo = responseInfo;
671
672     // add thread
673     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
674
675     return CA_STATUS_OK;
676
677     // memory error label.
678 memory_error_exit:
679
680     CADestroyRemoteEndpointInternal(remoteEndpoint);
681
682     CADestroyResponseInfoInternal(responseInfo);
683
684     if (data != NULL)
685     {
686         OICFree(data);
687     }
688
689     return CA_MEMORY_ALLOC_FAILED;
690 }
691
692 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
693                                       const CAHeaderOption_t *options,const uint8_t numOptions)
694 {
695     if (resourceUri == NULL)
696     {
697         return CA_STATUS_FAILED;
698     }
699     CARequestInfo_t *ReqInfo = NULL;
700     CAToken_t tempToken = NULL;
701     CARemoteEndpoint_t *remoteEndpoint = NULL;
702
703     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
704     CA_MEMORY_ALLOC_CHECK(data);
705
706     CAAddress_t addr = {};
707     remoteEndpoint = CACreateRemoteEndpointInternal(resourceUri, addr,
708                                          CA_ETHERNET | CA_WIFI | CA_EDR | CA_LE);
709
710     // create request info
711     ReqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t));
712     CA_MEMORY_ALLOC_CHECK(ReqInfo);
713
714     // copy token value
715     if (token != NULL)
716     {
717         int32_t len = strlen(token);
718         tempToken = (char *) OICCalloc((len + 1), sizeof(char));
719         CA_MEMORY_ALLOC_CHECK(tempToken);
720         strncpy(tempToken, token, len);
721     }
722
723     // save request info data
724     ReqInfo->method = CA_GET;
725     ReqInfo->info.token = tempToken;
726     ReqInfo->info.type = CA_MSG_NONCONFIRM;
727     // save data
728     data->type = SEND_TYPE_MULTICAST;
729     data->remoteEndpoint = remoteEndpoint;
730     data->requestInfo = ReqInfo;
731
732     data->responseInfo = NULL;
733     data->options = NULL;
734     data->numOptions = 0;
735
736     if (options != NULL && numOptions > 0)
737     {
738         // copy data
739         CAHeaderOption_t *temp = (CAHeaderOption_t *) OICCalloc(numOptions,
740                                      sizeof(CAHeaderOption_t));
741         CA_MEMORY_ALLOC_CHECK(temp);
742
743         memcpy(temp, options, sizeof(CAHeaderOption_t) * numOptions);
744
745         data->options = temp;
746         data->numOptions = numOptions;
747     }
748
749     // add thread
750     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
751
752     return CA_STATUS_OK;
753
754     // memory error label.
755 memory_error_exit:
756
757     CADestroyRemoteEndpointInternal(remoteEndpoint);
758
759     if (tempToken != NULL)
760     {
761         OICFree(tempToken);
762     }
763
764     if (ReqInfo != NULL)
765     {
766         OICFree(ReqInfo);
767     }
768
769     if (data != NULL)
770     {
771         OICFree(data);
772     }
773
774     return CA_MEMORY_ALLOC_FAILED;
775 }
776
777 void CASetRequestResponseCallbacks(CARequestCallback ReqHandler,
778                                    CAResponseCallback RespHandler)
779 {
780     OIC_LOG(DEBUG, TAG, "set request, response handler callback.");
781
782     g_requestHandler = ReqHandler;
783     g_responseHandler = RespHandler;
784 }
785
786 CAResult_t CAInitializeMessageHandler()
787 {
788     OIC_LOG(DEBUG, TAG, "CAInitializeMessageHandler - Entry");
789     CASetPacketReceivedCallback(CAReceivedPacketCallback);
790
791     CASetNetworkChangeCallback(CANetworkChangedCallback);
792
793     // create thread pool
794     CAResult_t res;
795     res = u_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle);
796
797     if (res != CA_STATUS_OK)
798     {
799         OIC_LOG(ERROR, TAG, "thread pool initialize error.");
800         return res;
801     }
802
803     // send thread initialize
804     CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle, CASendThreadProcess,
805             CADataDestroyer);
806
807     // start send thread
808     res = CAQueueingThreadStart(&g_sendThread);
809
810     if (res != CA_STATUS_OK)
811     {
812         OIC_LOG(ERROR, TAG, "thread start error(send thread).");
813         u_thread_pool_free(g_threadPoolHandle);
814         g_threadPoolHandle = NULL;
815         return res;
816     }
817
818     // receive thread initialize
819     CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle, CAReceiveThreadProcess,
820             CADataDestroyer);
821
822 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
823     // start receive thread
824     res = CAQueueingThreadStart(&gReceiveThread);
825
826     if (res != CA_STATUS_OK)
827     {
828         OIC_LOG(ERROR, TAG, "thread start error(receive thread).");
829         return res;
830     }
831 #endif
832
833     // retransmission initialize
834     CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData,
835             CATimeoutCallback, NULL);
836
837     // start retransmission
838     res = CARetransmissionStart(&g_retransmissionContext);
839
840     if (res != CA_STATUS_OK)
841     {
842         OIC_LOG(ERROR, TAG, "thread start error(retransmission thread).");
843         return res;
844     }
845
846     // initialize interface adapters by controller
847     CAInitializeAdapters(g_threadPoolHandle);
848
849     return CA_STATUS_OK;
850 }
851
852 void CATerminateMessageHandler()
853 {
854     uint8_t i = 0;
855     CAConnectivityType_t connType;
856     u_arraylist_t *list = CAGetSelectedNetworkList();
857     uint32_t length = u_arraylist_length(list);
858
859     for (i = 0; i < length; i++)
860     {
861         void* ptrType = u_arraylist_get(list, i);
862
863         if (ptrType == NULL)
864         {
865             continue;
866         }
867
868         connType = *(CAConnectivityType_t *) ptrType;
869         CAStopAdapter(connType);
870     }
871
872     // stop retransmission
873     if (g_retransmissionContext.threadMutex != NULL)
874     {
875         CARetransmissionStop(&g_retransmissionContext);
876     }
877
878     // stop thread
879     // delete thread data
880     if (g_sendThread.threadMutex != NULL)
881     {
882         CAQueueingThreadStop(&g_sendThread);
883     }
884
885     // stop thread
886     // delete thread data
887     if (g_receiveThread.threadMutex != NULL)
888     {
889 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
890         CAQueueingThreadStop(&gReceiveThread);
891 #endif
892     }
893
894     // destroy thread pool
895     if (g_threadPoolHandle != NULL)
896     {
897         u_thread_pool_free(g_threadPoolHandle);
898         g_threadPoolHandle = NULL;
899     }
900
901     CARetransmissionDestroy(&g_retransmissionContext);
902     CAQueueingThreadDestroy(&g_sendThread);
903     CAQueueingThreadDestroy(&g_receiveThread);
904
905     // terminate interface adapters by controller
906     CATerminateAdapters();
907
908     OIC_LOG(DEBUG, TAG, "message handler termination is complete!");
909 }
910