[IOT-209][CA Block] CASendRequestToAll from client changes NON-CON messages as CON...
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "cainterfacecontroller.h"
30 #include "caprotocolmessage.h"
31 #include "uqueue.h"
32 #include "logger.h"
33 #include "config.h" /* for coap protocol */
34 #include "coap.h"
35 #include "uthreadpool.h" /* for thread pool */
36 #include "caqueueingthread.h"
37 #include "caretransmission.h"
38 #include "umutex.h"
39 #include "oic_malloc.h"
40
41 #define TAG PCF("CA")
42
43 #define MEMORY_ALLOC_CHECK(arg) { if (arg == NULL) {OIC_LOG_V(DEBUG, TAG, "memory error"); \
44 goto memory_error_exit;} }
45
46 #define MAX_THREAD_POOL_SIZE    10
47
48 typedef enum
49 {
50     SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST
51 } CASendDataType_t;
52
53 typedef struct
54 {
55     CASendDataType_t type;
56     CARemoteEndpoint_t *remoteEndpoint;
57     CARequestInfo_t *requestInfo;
58     CAResponseInfo_t *responseInfo;
59     CAHeaderOption_t *options;
60     uint8_t numOptions;
61 } CAData_t;
62
63 // thread pool handle
64 static u_thread_pool_t gThreadPoolHandle = NULL;
65
66 // message handler main thread
67 static CAQueueingThread_t gSendThread;
68 static CAQueueingThread_t gReceiveThread;
69
70 static CARetransmission_t gRetransmissionContext;
71
72 // handler field
73 static CARequestCallback gRequestHandler = NULL;
74 static CAResponseCallback gResponseHandler = NULL;
75
76 static void CATimeoutCallback(const CARemoteEndpoint_t *endpoint, void *pdu, uint32_t size)
77 {
78     CARemoteEndpoint_t* ep = CACloneRemoteEndpoint(endpoint);
79     if (ep == NULL)
80     {
81         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
82         return;
83     }
84
85     CAResponseInfo_t* resInfo = (CAResponseInfo_t*) OICMalloc(sizeof(CAResponseInfo_t));
86
87     if (resInfo == NULL)
88     {
89         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
90         CADestroyRemoteEndpointInternal(ep);
91         return;
92     }
93     memset(resInfo, 0, sizeof(CAResponseInfo_t));
94
95     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
96     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
97
98     resInfo->result = CA_RETRANSMIT_TIMEOUT;
99     resInfo->info.type = type;
100     resInfo->info.messageId = messageId;
101
102     CAData_t *cadata = (CAData_t *) OICMalloc(sizeof(CAData_t));
103     if (cadata == NULL)
104     {
105         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
106         CADestroyRemoteEndpointInternal(ep);
107         OICFree(resInfo);
108         return;
109     }
110     memset(cadata, 0, sizeof(CAData_t));
111
112     cadata->type = SEND_TYPE_UNICAST;
113     cadata->remoteEndpoint = ep;
114     cadata->requestInfo = NULL;
115     cadata->responseInfo = resInfo;
116
117     CAQueueingThreadAddData(&gReceiveThread, cadata, sizeof(CAData_t));
118 }
119
120 static void CAReceiveThreadProcess(void *threadData)
121 {
122     // TODO
123     // currently not support.
124     // CAHandleRequestResponseCallbacks codes will move to this function.
125 }
126
127 static void CASendThreadProcess(void *threadData)
128 {
129     CAData_t *data = (CAData_t *) threadData;
130
131     if (data == NULL)
132     {
133         OIC_LOG(DEBUG, TAG, "thread data error!!");
134         return;
135     }
136
137     if (NULL == data->remoteEndpoint)
138     {
139         OIC_LOG(DEBUG, TAG, "remoteEndpoint is null");
140         return;
141     }
142
143     CAResult_t res = CA_STATUS_FAILED;
144
145     CASendDataType_t type = data->type;
146
147     if (type == SEND_TYPE_UNICAST)
148     {
149         coap_pdu_t *pdu = NULL;
150
151         if (data->requestInfo != NULL)
152         {
153             OIC_LOG_V(DEBUG, TAG, "requestInfo is available..");
154
155             pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri,
156                                                data->requestInfo->method, data->requestInfo->info);
157         }
158         else if (data->responseInfo != NULL)
159         {
160             OIC_LOG_V(DEBUG, TAG, "responseInfo is available..");
161
162             pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri,
163                                                data->responseInfo->result,
164                                                data->responseInfo->info);
165         }
166         else
167         {
168             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
169         }
170
171         // interface controller function call.
172         if (NULL != pdu)
173         {
174             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
175
176             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
177
178             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
179
180             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
181
182             OIC_LOG_V(DEBUG, TAG, "PDU Maker - token : %s", pdu->hdr->token);
183
184             OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %s", pdu->hdr);
185
186             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
187
188             // for retransmission
189             CARetransmissionSentData(&gRetransmissionContext, data->remoteEndpoint, pdu->hdr,
190                                      pdu->length);
191
192             //For Unicast , data will be deleted by adapters
193             CADestroyRemoteEndpointInternal(data->remoteEndpoint);
194         }
195         coap_delete_pdu(pdu);
196     }
197     else if (type == SEND_TYPE_MULTICAST)
198     {
199         OIC_LOG(DEBUG, TAG, "both requestInfo & responseInfo is not available");
200
201         coap_pdu_t *pdu = NULL;
202         CAInfo_t info;
203         memset(&info, 0, sizeof(CAInfo_t));
204
205         info.options = data->options;
206         info.numOptions = data->numOptions;
207         info.token = data->requestInfo->info.token;
208         info.type = data->requestInfo->info.type;
209
210         pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri, CA_GET, info);
211
212         if (NULL != pdu)
213         {
214             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
215
216             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
217
218             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
219
220             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
221
222             OIC_LOG_V(DEBUG, TAG, "PDU Maker - token : %s", pdu->hdr->token);
223
224             OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %s", pdu->hdr);
225
226             res = CASendMulticastData(pdu->hdr, pdu->length);
227         }
228         coap_delete_pdu(pdu);
229     }
230
231     OIC_LOG_V(DEBUG, TAG, " Result :%d", res);
232 }
233
234 static void CAReceivedPacketCallback(CARemoteEndpoint_t *endpoint, void *data,
235                                      uint32_t dataLen)
236 {
237     OIC_LOG(DEBUG, TAG, "receivedPacketCallback in message handler!!");
238
239     if (NULL == data)
240     {
241         OIC_LOG(DEBUG, TAG, "received data is null");
242         return;
243     }
244
245     coap_pdu_t *pdu;
246     uint32_t code = CA_NOT_FOUND;
247
248     OIC_LOG_V(DEBUG, TAG, "data : %s", data);
249     pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code);
250
251     OICFree(data);
252
253     char uri[CA_MAX_URI_LENGTH] =
254     { 0, };
255
256     if (code == CA_GET || code == CA_POST || code == CA_PUT || code == CA_DELETE)
257     {
258         CARequestInfo_t *ReqInfo;
259         ReqInfo = (CARequestInfo_t *) OICMalloc(sizeof(CARequestInfo_t));
260         if (ReqInfo == NULL)
261         {
262             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
263             coap_delete_pdu(pdu);
264             return;
265         }
266         memset(ReqInfo, 0, sizeof(CARequestInfo_t));
267         CAGetRequestInfoFromPdu(pdu, ReqInfo, uri);
268
269         if (NULL != ReqInfo->info.options)
270         {
271             uint32_t i;
272             for (i = 0; i < ReqInfo->info.numOptions; i++)
273             {
274                 OIC_LOG_V(DEBUG, TAG, "Request- optionID: %d", ReqInfo->info.options[i].optionID);
275
276                 OIC_LOG_V(DEBUG, TAG, "Request- list: %s", ReqInfo->info.options[i].optionData);
277             }
278         }
279
280         if (NULL != ReqInfo->info.payload)
281         {
282             OIC_LOG_V(DEBUG, TAG, "Request- payload: %s", ReqInfo->info.payload);
283         }
284         OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method);
285         OIC_LOG_V(DEBUG, TAG, "Request- token : %s", ReqInfo->info.token);
286
287         if (NULL != endpoint)
288         {
289             endpoint->resourceUri = (char *) OICMalloc(strlen(uri) + 1);
290             if (endpoint->resourceUri == NULL)
291             {
292                 OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
293                 OICFree(ReqInfo);
294                 coap_delete_pdu(pdu);
295                 return;
296             }
297             memset(endpoint->resourceUri, 0, strlen(uri) + 1);
298             memcpy(endpoint->resourceUri, uri, strlen(uri));
299             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
300         }
301         // store the data at queue.
302         CAData_t *cadata = NULL;
303         cadata = (CAData_t *) OICMalloc(sizeof(CAData_t));
304         if (cadata == NULL)
305         {
306             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
307             if (endpoint->resourceUri != NULL)
308                 OICFree(endpoint->resourceUri);
309             OICFree(ReqInfo);
310             coap_delete_pdu(pdu);
311             return;
312         }
313         memset(cadata, 0, sizeof(CAData_t));
314
315         cadata->type = SEND_TYPE_UNICAST;
316         cadata->remoteEndpoint = endpoint;
317         cadata->requestInfo = ReqInfo;
318         cadata->responseInfo = NULL;
319         CAQueueingThreadAddData(&gReceiveThread, cadata, sizeof(CAData_t));
320     }
321     else
322     {
323         CAResponseInfo_t *ResInfo;
324         ResInfo = (CAResponseInfo_t *) OICMalloc(sizeof(CAResponseInfo_t));
325         if (ResInfo == NULL)
326         {
327             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
328             coap_delete_pdu(pdu);
329             return;
330         }
331         memset(ResInfo, 0, sizeof(CAResponseInfo_t));
332         CAGetResponseInfoFromPdu(pdu, ResInfo, uri);
333
334         if (NULL != ResInfo->info.options)
335         {
336             uint32_t i;
337             for (i = 0; i < ResInfo->info.numOptions; i++)
338             {
339                 OIC_LOG_V(DEBUG, TAG, "Response- optionID: %d", ResInfo->info.options[i].optionID);
340
341                 OIC_LOG_V(DEBUG, TAG, "Response- list: %s", ResInfo->info.options[i].optionData);
342             }
343             if (NULL != ResInfo->info.payload)
344             {
345                 OIC_LOG_V(DEBUG, TAG, "Response- payload: %s", ResInfo->info.payload);
346             } OIC_LOG_V(DEBUG, TAG, "Response- code: %d", ResInfo->result);
347         }
348
349         if (NULL != endpoint)
350         {
351             endpoint->resourceUri = (char *) OICMalloc(strlen(uri) + 1);
352             if (endpoint->resourceUri == NULL)
353             {
354                 OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
355                 OICFree(ResInfo);
356                 coap_delete_pdu(pdu);
357                 return;
358             }
359             memset(endpoint->resourceUri, 0, strlen(uri) + 1);
360             memcpy(endpoint->resourceUri, uri, strlen(uri));
361             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
362         }
363
364         // store the data at queue.
365         CAData_t *cadata = NULL;
366         cadata = (CAData_t *) OICMalloc(sizeof(CAData_t));
367         if (cadata == NULL)
368         {
369             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
370             if (endpoint->resourceUri != NULL)
371                 OICFree(endpoint->resourceUri);
372             OICFree(ResInfo);
373             coap_delete_pdu(pdu);
374             return;
375         }
376         memset(cadata, 0, sizeof(CAData_t));
377
378         cadata->type = SEND_TYPE_UNICAST;
379         cadata->remoteEndpoint = endpoint;
380         cadata->requestInfo = NULL;
381         cadata->responseInfo = ResInfo;
382
383         CAQueueingThreadAddData(&gReceiveThread, cadata, sizeof(CAData_t));
384
385         // for retransmission
386         CARetransmissionReceivedData(&gRetransmissionContext, endpoint, pdu->hdr, pdu->length);
387     }
388     OICFree(pdu);
389 }
390
391 static void CANetworkChangedCallback(CALocalConnectivity_t *info, CANetworkStatus_t status)
392 {
393     OIC_LOG(DEBUG, TAG, "networkChangeCallback in message handler!!");
394
395     OIC_LOG_V(DEBUG, TAG, "Changed Network Status: %d", status);
396 }
397
398 void CAHandleRequestResponseCallbacks()
399 {
400     OIC_LOG_V(DEBUG, TAG, "CAHandleRequestResponseCallbacks");
401
402     // parse the data and call the callbacks.
403     // #1 parse the data
404     // #2 get endpoint
405
406     u_mutex_lock(gReceiveThread.threadMutex);
407
408     u_queue_message_t *item = u_queue_get_element(gReceiveThread.dataQueue);
409
410     u_mutex_unlock(gReceiveThread.threadMutex);
411
412     if (item == NULL)
413         return;
414
415     // get values
416     void *msg = item->msg;
417
418     if (msg == NULL)
419         return;
420
421     // get endpoint
422     CAData_t *td = (CAData_t *) msg;
423     CARemoteEndpoint_t *rep = td->remoteEndpoint;
424
425     if (rep == NULL)
426         return;
427
428     if (td->requestInfo != NULL)
429     {
430         if (gRequestHandler)
431         {
432             gRequestHandler(rep, td->requestInfo);
433         }
434
435         if (NULL != td->requestInfo->info.options)
436         {
437             OICFree(td->requestInfo->info.options);
438         }
439
440         if (NULL != td->requestInfo->info.payload)
441         {
442             OICFree(td->requestInfo->info.payload);
443         }
444
445         if (NULL != td->requestInfo->info.token)
446         {
447             OICFree(td->requestInfo->info.token);
448         }
449         OICFree(td->requestInfo);
450     }
451
452     if (td->responseInfo != NULL)
453     {
454         if (gResponseHandler)
455         {
456             gResponseHandler(rep, td->responseInfo);
457         }
458
459         if (NULL != td->responseInfo->info.options)
460         {
461             OICFree(td->responseInfo->info.options);
462         }
463
464         if (NULL != td->responseInfo->info.payload)
465         {
466             OICFree(td->responseInfo->info.payload);
467         }
468
469         if (NULL != td->responseInfo->info.token)
470         {
471             OICFree(td->responseInfo->info.token);
472         }
473         OICFree(td->responseInfo);
474     }
475
476     if (NULL != rep->resourceUri)
477     {
478         OICFree(rep->resourceUri);
479     }
480     OICFree(rep);
481 }
482
483 CAResult_t CADetachRequestMessage(const CARemoteEndpoint_t *object,
484                                   const CARequestInfo_t *request)
485 {
486     OIC_LOG_V(DEBUG, TAG, "CADetachRequestMessage");
487
488     if (object == NULL || request == NULL)
489     {
490         return CA_STATUS_FAILED;
491     }
492
493     CAData_t *data = (CAData_t *) OICMalloc(sizeof(CAData_t));
494     MEMORY_ALLOC_CHECK(data);
495
496     // initialize
497     memset(data, 0, sizeof(CAData_t));
498
499     // clone remote endpoint
500     CARemoteEndpoint_t *remoteEndpoint = CACloneRemoteEndpoint(object);
501     MEMORY_ALLOC_CHECK(remoteEndpoint);
502
503     // clone request info
504     CARequestInfo_t *requestInfo = CACloneRequestInfo(request);
505     MEMORY_ALLOC_CHECK(requestInfo);
506
507     // save data
508     data->type = SEND_TYPE_UNICAST;
509     data->remoteEndpoint = remoteEndpoint;
510     data->requestInfo = requestInfo;
511     data->responseInfo = NULL;
512
513     // add thread
514     CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
515
516     return CA_STATUS_OK;
517
518     // memory error label.
519 memory_error_exit:
520
521     CADestroyRemoteEndpointInternal(remoteEndpoint);
522
523     CADestroyRequestInfoInternal(requestInfo);
524
525     if (data != NULL)
526     {
527         OICFree(data);
528     }
529
530     return CA_MEMORY_ALLOC_FAILED;
531 }
532
533 CAResult_t CADetachRequestToAllMessage(const CAGroupEndpoint_t *object,
534                                        const CARequestInfo_t *request)
535 {
536     // ToDo
537     OIC_LOG_V(DEBUG, TAG, "CADetachRequestToAllMessage");
538
539
540     if (object == NULL || request == NULL)
541     {
542         return CA_STATUS_FAILED;
543     }
544
545     CAData_t *data = (CAData_t *) OICMalloc(sizeof(CAData_t));
546     MEMORY_ALLOC_CHECK(data);
547
548     // initialize
549     memset(data, 0, sizeof(CAData_t));
550
551     CAAddress_t addr;
552     memset(&addr, 0, sizeof(CAAddress_t));
553     CARemoteEndpoint_t *remoteEndpoint = CACreateRemoteEndpointInternal(object->resourceUri, addr,
554                                          object->connectivityType);
555
556     // clone request info
557     CARequestInfo_t *requestInfo = CACloneRequestInfo(request);
558     MEMORY_ALLOC_CHECK(requestInfo);
559
560     // save data
561     data->type = SEND_TYPE_MULTICAST;
562     data->remoteEndpoint = remoteEndpoint;
563     data->requestInfo = requestInfo;
564     data->responseInfo = NULL;
565
566     // add thread
567     CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
568
569     return CA_STATUS_OK;
570
571     // memory error label.
572 memory_error_exit:
573
574     CADestroyRemoteEndpointInternal(remoteEndpoint);
575
576     CADestroyRequestInfoInternal(requestInfo);
577
578     if (data != NULL)
579     {
580         OICFree(data);
581     }
582
583     return CA_MEMORY_ALLOC_FAILED;
584 }
585
586 CAResult_t CADetachResponseMessage(const CARemoteEndpoint_t *object,
587                                    const CAResponseInfo_t *response)
588 {
589     OIC_LOG_V(DEBUG, TAG, "CADetachResponseMessage");
590
591     if (object == NULL || response == NULL)
592     {
593         return CA_STATUS_FAILED;
594     }
595
596     CAData_t *data = (CAData_t *) OICMalloc(sizeof(CAData_t));
597     MEMORY_ALLOC_CHECK(data);
598
599     // initialize
600     memset(data, 0, sizeof(CAData_t));
601
602     // clone remote endpoint
603     CARemoteEndpoint_t *remoteEndpoint = CACloneRemoteEndpoint(object);
604     MEMORY_ALLOC_CHECK(remoteEndpoint);
605
606     // clone response info
607     CAResponseInfo_t *responseInfo = CACloneResponseInfo(response);
608     MEMORY_ALLOC_CHECK(responseInfo);
609
610     // save data
611     data->type = SEND_TYPE_UNICAST;
612     data->remoteEndpoint = remoteEndpoint;
613     data->requestInfo = NULL;
614     data->responseInfo = responseInfo;
615
616     // add thread
617     CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
618
619     return CA_STATUS_OK;
620
621     // memory error label.
622 memory_error_exit:
623
624     CADestroyRemoteEndpointInternal(remoteEndpoint);
625
626     CADestroyResponseInfoInternal(responseInfo);
627
628     if (data != NULL)
629     {
630         OICFree(data);
631     }
632
633     return CA_MEMORY_ALLOC_FAILED;
634 }
635
636 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
637                                       const CAHeaderOption_t *options, uint8_t numOptions)
638 {
639     if (resourceUri == NULL)
640     {
641         return CA_STATUS_FAILED;
642     }
643     CAData_t *data = (CAData_t *) OICMalloc(sizeof(CAData_t));
644     MEMORY_ALLOC_CHECK(data);
645
646     // initialize
647     memset(data, 0, sizeof(CAData_t));
648
649     CAAddress_t addr;
650     memset(&addr, 0, sizeof(CAAddress_t));
651     CARemoteEndpoint_t *remoteEndpoint = CACreateRemoteEndpointInternal(resourceUri, addr,
652                                          CA_ETHERNET | CA_WIFI | CA_EDR | CA_LE);
653
654     // save data
655     data->type = SEND_TYPE_MULTICAST;
656     data->remoteEndpoint = remoteEndpoint;
657     CARequestInfo_t *ReqInfo;
658     ReqInfo = (CARequestInfo_t *) OICMalloc(sizeof(CARequestInfo_t));
659     if (ReqInfo == NULL)
660     {
661         OIC_LOG(DEBUG, TAG, "CADetachMessageResourceUri, Memory allocation failed !");
662         OICFree(data);
663         return CA_MEMORY_ALLOC_FAILED;
664     }
665     memset(ReqInfo, 0, sizeof(CARequestInfo_t));
666     ReqInfo->method = CA_GET;
667     ReqInfo->info.token = token;
668     ReqInfo->info.type = CA_MSG_NONCONFIRM;
669     data->requestInfo = ReqInfo;
670
671     data->responseInfo = NULL;
672     data->options = NULL;
673     data->numOptions = 0;
674
675     if (options != NULL && numOptions > 0)
676     {
677         // copy data
678         CAHeaderOption_t *temp = (CAHeaderOption_t *) OICMalloc(
679                                      sizeof(CAHeaderOption_t) * numOptions);
680
681         MEMORY_ALLOC_CHECK(temp);
682
683         memset(temp, 0, sizeof(CAHeaderOption_t) * numOptions);
684         memcpy(temp, options, sizeof(CAHeaderOption_t) * numOptions);
685
686         data->options = temp;
687         data->numOptions = numOptions;
688     }
689
690     // add thread
691     CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
692
693     return CA_STATUS_OK;
694
695     // memory error label.
696 memory_error_exit:
697
698     CADestroyRemoteEndpointInternal(remoteEndpoint);
699
700     if (data != NULL)
701     {
702         OICFree(data);
703     }
704
705     return CA_MEMORY_ALLOC_FAILED;
706 }
707
708 void CASetRequestResponseCallbacks(CARequestCallback ReqHandler,
709                                    CAResponseCallback RespHandler)
710 {
711     OIC_LOG_V(DEBUG, TAG, "set request, response handler callback.");
712
713     gRequestHandler = ReqHandler;
714     gResponseHandler = RespHandler;
715 }
716
717 CAResult_t CAInitializeMessageHandler()
718 {
719     OIC_LOG(DEBUG, TAG, "CAInitializeMessageHandler - Entry");
720     CASetPacketReceivedCallback(CAReceivedPacketCallback);
721
722     CASetNetworkChangeCallback(CANetworkChangedCallback);
723
724     // create thread pool
725     CAResult_t res;
726     res = u_thread_pool_init(MAX_THREAD_POOL_SIZE, &gThreadPoolHandle);
727
728     if (res != CA_STATUS_OK)
729     {
730         OIC_LOG_V(DEBUG, TAG, "thread pool initialize error.");
731         return res;
732     }
733
734     // send thread initialize
735     CAQueueingThreadInitialize(&gSendThread, gThreadPoolHandle, CASendThreadProcess);
736
737     // start send thread
738     res = CAQueueingThreadStart(&gSendThread);
739
740     if (res != CA_STATUS_OK)
741     {
742         OIC_LOG_V(DEBUG, TAG, "thread start error(send thread).");
743         return res;
744     }
745
746     // receive thread initialize
747     CAQueueingThreadInitialize(&gReceiveThread, gThreadPoolHandle, CAReceiveThreadProcess);
748
749     // start receive thread
750     // TODO
751     // currently not support.
752 //    res = CAThreadStart(gReceiveThread);
753 //
754 //    if (res != CA_STATUS_OK)
755 //    {
756 //        OIC_LOG_V(DEBUG, TAG, "thread start error(receive thread).");
757 //        return res;
758 //    }
759
760     // retransmission initialize
761     CARetransmissionInitialize(&gRetransmissionContext, gThreadPoolHandle, CASendUnicastData,
762             CATimeoutCallback, NULL);
763
764     // start retransmission
765     res = CARetransmissionStart(&gRetransmissionContext);
766
767     if (res != CA_STATUS_OK)
768     {
769         OIC_LOG_V(DEBUG, TAG, "thread start error(retransmission thread).");
770         return res;
771     }
772
773     // initialize interface adapters by controller
774     CAInitializeAdapters(gThreadPoolHandle);
775
776     return CA_STATUS_OK;
777 }
778
779 void CATerminateMessageHandler()
780 {
781     // terminate interface adapters by controller
782     CATerminateAdapters();
783
784     // stop retransmission
785     CARetransmissionStop(&gRetransmissionContext);
786     CARetransmissionDestroy(&gRetransmissionContext);
787
788     // stop thread
789     // delete thread data
790     CAQueueingThreadStop(&gSendThread);
791     CAQueueingThreadDestroy(&gSendThread);
792
793     // stop thread
794     // delete thread data
795     CAQueueingThreadDestroy(&gReceiveThread);
796
797     // destroy thread pool
798     u_thread_pool_free(gThreadPoolHandle);
799
800     OIC_LOG_V(DEBUG, TAG, "message handler termination is complete!");
801 }
802