Fixed prevent and klock work issues reported.
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "cainterfacecontroller.h"
30 #include "caprotocolmessage.h"
31 #include "uqueue.h"
32 #include "logger.h"
33 #include "config.h" /* for coap protocol */
34 #include "coap.h"
35 #include "uthreadpool.h" /* for thread pool */
36 #include "caqueueingthread.h"
37 #include "caretransmission.h"
38 #include "umutex.h"
39 #include "oic_malloc.h"
40
41 #define TAG PCF("CA")
42 #define SINGLE_HANDLE
43
44 #define MEMORY_ALLOC_CHECK(arg) { if (arg == NULL) {OIC_LOG_V(DEBUG, TAG, "memory error"); \
45 goto memory_error_exit;} }
46
47 #define MAX_THREAD_POOL_SIZE    20
48
49 typedef enum
50 {
51     SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST
52 } CASendDataType_t;
53
54 typedef struct
55 {
56     CASendDataType_t type;
57     CARemoteEndpoint_t *remoteEndpoint;
58     CARequestInfo_t *requestInfo;
59     CAResponseInfo_t *responseInfo;
60     CAHeaderOption_t *options;
61     uint8_t numOptions;
62 } CAData_t;
63
64 // thread pool handle
65 static u_thread_pool_t gThreadPoolHandle = NULL;
66
67 // message handler main thread
68 static CAQueueingThread_t gSendThread;
69 static CAQueueingThread_t gReceiveThread;
70
71 static CARetransmission_t gRetransmissionContext;
72
73 // handler field
74 static CARequestCallback gRequestHandler = NULL;
75 static CAResponseCallback gResponseHandler = NULL;
76
77 static void CATimeoutCallback(const CARemoteEndpoint_t *endpoint, void *pdu, uint32_t size)
78 {
79     CARemoteEndpoint_t* ep = CACloneRemoteEndpoint(endpoint);
80     if (ep == NULL)
81     {
82         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
83         return;
84     }
85
86     CAResponseInfo_t* resInfo = (CAResponseInfo_t*) OICMalloc(sizeof(CAResponseInfo_t));
87
88     if (resInfo == NULL)
89     {
90         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
91         CADestroyRemoteEndpointInternal(ep);
92         return;
93     }
94     memset(resInfo, 0, sizeof(CAResponseInfo_t));
95
96     CAMessageType_t type = CAGetMessageTypeFromPduBinaryData(pdu, size);
97     uint16_t messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
98
99     resInfo->result = CA_RETRANSMIT_TIMEOUT;
100     resInfo->info.type = type;
101     resInfo->info.messageId = messageId;
102
103     CAData_t *cadata = (CAData_t *) OICMalloc(sizeof(CAData_t));
104     if (cadata == NULL)
105     {
106         OIC_LOG(DEBUG, TAG, "memory allocation failed !");
107         CADestroyRemoteEndpointInternal(ep);
108         OICFree(resInfo);
109         return;
110     }
111     memset(cadata, 0, sizeof(CAData_t));
112
113     cadata->type = SEND_TYPE_UNICAST;
114     cadata->remoteEndpoint = ep;
115     cadata->requestInfo = NULL;
116     cadata->responseInfo = resInfo;
117
118     CAQueueingThreadAddData(&gReceiveThread, cadata, sizeof(CAData_t));
119 }
120
121 static void CADataDestroyer(void *data, uint32_t size)
122 {
123     CAData_t *cadata = (CAData_t *) data;
124
125     if (cadata == NULL)
126     {
127         return;
128     }
129
130     if (cadata->remoteEndpoint != NULL)
131     {
132         CADestroyRemoteEndpointInternal((CARemoteEndpoint_t *)cadata->remoteEndpoint);
133     }
134
135     if (cadata->requestInfo != NULL)
136     {
137         CADestroyRequestInfoInternal((CARequestInfo_t *)cadata->requestInfo);
138     }
139
140     if (cadata->responseInfo != NULL)
141     {
142         CADestroyResponseInfoInternal((CAResponseInfo_t *)cadata->responseInfo);
143     }
144
145     if (cadata->options != NULL)
146     {
147         OICFree(cadata->options);
148     }
149
150     OICFree(cadata);
151 }
152
153 static void CAReceiveThreadProcess(void *threadData)
154 {
155     // Currently not supported
156         // This will be enabled when RI supports multi threading
157 #ifndef SINGLE_HANDLE
158     CAData_t *data = (CAData_t *) threadData;
159
160     if (data == NULL)
161     {
162         OIC_LOG(DEBUG, TAG, "thread data error!!");
163         return;
164     }
165
166     // parse the data and call the callbacks.
167     // #1 parse the data
168     // #2 get endpoint
169     CARemoteEndpoint_t *rep = (CARemoteEndpoint_t *)(data->remoteEndpoint);
170
171     if (rep == NULL)
172         return;
173
174     if (data->requestInfo != NULL)
175     {
176         if (gRequestHandler)
177         {
178             gRequestHandler(rep, data->requestInfo);
179         }
180     }
181
182     if (data->responseInfo != NULL)
183     {
184         if (gResponseHandler)
185         {
186             gResponseHandler(rep, data->responseInfo);
187         }
188     }
189 #endif
190 }
191
192 static void CASendThreadProcess(void *threadData)
193 {
194     CAData_t *data = (CAData_t *) threadData;
195
196     if (data == NULL)
197     {
198         OIC_LOG(DEBUG, TAG, "thread data error!!");
199         return;
200     }
201
202     if (NULL == data->remoteEndpoint)
203     {
204         OIC_LOG(DEBUG, TAG, "remoteEndpoint is null");
205         return;
206     }
207
208     CAResult_t res = CA_STATUS_FAILED;
209
210     CASendDataType_t type = data->type;
211
212     if (type == SEND_TYPE_UNICAST)
213     {
214         coap_pdu_t *pdu = NULL;
215
216         if (data->requestInfo != NULL)
217         {
218             OIC_LOG_V(DEBUG, TAG, "requestInfo is available..");
219
220             pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri,
221                                                data->requestInfo->method,
222                                                data->requestInfo->info);
223         }
224         else if (data->responseInfo != NULL)
225         {
226             OIC_LOG_V(DEBUG, TAG, "responseInfo is available..");
227
228             pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri,
229                                                data->responseInfo->result,
230                                                data->responseInfo->info);
231         }
232         else
233         {
234             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
235         }
236
237         // interface controller function call.
238         if (NULL != pdu)
239         {
240             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
241
242             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
243
244             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
245
246             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
247
248             OIC_LOG_V(DEBUG, TAG, "PDU Maker - token : %s", pdu->hdr->token);
249
250             OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %s", pdu->hdr);
251
252             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
253
254             // for retransmission
255             CARetransmissionSentData(&gRetransmissionContext, data->remoteEndpoint, pdu->hdr,
256                                      pdu->length);
257
258             coap_delete_pdu(pdu);
259         }
260     }
261     else if (type == SEND_TYPE_MULTICAST)
262     {
263         OIC_LOG(DEBUG, TAG, "both requestInfo & responseInfo is not available");
264
265         coap_pdu_t *pdu = NULL;
266         CAInfo_t info;
267         memset(&info, 0, sizeof(CAInfo_t));
268
269         info.options = data->options;
270         info.numOptions = data->numOptions;
271         info.token = data->requestInfo->info.token;
272         info.type = data->requestInfo->info.type;
273
274         pdu = (coap_pdu_t *) CAGeneratePdu(data->remoteEndpoint->resourceUri, CA_GET, info);
275
276         if (NULL != pdu)
277         {
278             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
279
280             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
281
282             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
283
284             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
285
286             OIC_LOG_V(DEBUG, TAG, "PDU Maker - token : %s", pdu->hdr->token);
287
288             OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %s", pdu->hdr);
289
290             res = CASendMulticastData(pdu->hdr, pdu->length);
291             coap_delete_pdu(pdu);
292         }
293     }
294
295     OIC_LOG_V(DEBUG, TAG, " Result :%d", res);
296 }
297
298 static void CAReceivedPacketCallback(CARemoteEndpoint_t *endpoint, void *data,
299                                      uint32_t dataLen)
300 {
301     OIC_LOG(DEBUG, TAG, "receivedPacketCallback in message handler!!");
302
303     if (NULL == data)
304     {
305         OIC_LOG(DEBUG, TAG, "received data is null");
306         return;
307     }
308
309     coap_pdu_t *pdu;
310     uint32_t code = CA_NOT_FOUND;
311
312     OIC_LOG_V(DEBUG, TAG, "data : %s", data);
313     pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code);
314     OICFree(data);
315
316     if(NULL == pdu)
317     {
318         OIC_LOG(DEBUG, TAG, "pdu is null");
319         return;
320     }
321
322     char uri[CA_MAX_URI_LENGTH] =
323     { 0, };
324
325     if (code == CA_GET || code == CA_POST || code == CA_PUT || code == CA_DELETE)
326     {
327         CARequestInfo_t *ReqInfo;
328         ReqInfo = (CARequestInfo_t *) OICMalloc(sizeof(CARequestInfo_t));
329         if (ReqInfo == NULL)
330         {
331             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
332             coap_delete_pdu(pdu);
333             return;
334         }
335         memset(ReqInfo, 0, sizeof(CARequestInfo_t));
336         CAGetRequestInfoFromPdu(pdu, ReqInfo, uri);
337
338         if (NULL != ReqInfo->info.options)
339         {
340             uint32_t i;
341             for (i = 0; i < ReqInfo->info.numOptions; i++)
342             {
343                 OIC_LOG_V(DEBUG, TAG, "Request- optionID: %d", ReqInfo->info.options[i].optionID);
344
345                 OIC_LOG_V(DEBUG, TAG, "Request- list: %s", ReqInfo->info.options[i].optionData);
346             }
347         }
348
349         if (NULL != ReqInfo->info.payload)
350         {
351             OIC_LOG_V(DEBUG, TAG, "Request- payload: %s", ReqInfo->info.payload);
352         }
353         OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method);
354         OIC_LOG_V(DEBUG, TAG, "Request- token : %s", ReqInfo->info.token);
355         OIC_LOG_V(DEBUG, TAG, "Request- msgID: %d", ReqInfo->info.messageId);
356
357         if (NULL != endpoint)
358         {
359             endpoint->resourceUri = (char *) OICMalloc(strlen(uri) + 1);
360             if (endpoint->resourceUri == NULL)
361             {
362                 OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
363                 OICFree(ReqInfo);
364                 coap_delete_pdu(pdu);
365                 return;
366             }
367             memset(endpoint->resourceUri, 0, strlen(uri) + 1);
368             memcpy(endpoint->resourceUri, uri, strlen(uri));
369             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
370         }
371         // store the data at queue.
372         CAData_t *cadata = NULL;
373         cadata = (CAData_t *) OICMalloc(sizeof(CAData_t));
374         if (cadata == NULL)
375         {
376             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
377             if (endpoint != NULL && endpoint->resourceUri != NULL)
378                 OICFree(endpoint->resourceUri);
379             OICFree(ReqInfo);
380             coap_delete_pdu(pdu);
381             return;
382         }
383         memset(cadata, 0, sizeof(CAData_t));
384
385         cadata->type = SEND_TYPE_UNICAST;
386         cadata->remoteEndpoint = endpoint;
387         cadata->requestInfo = ReqInfo;
388         cadata->responseInfo = NULL;
389         CAQueueingThreadAddData(&gReceiveThread, cadata, sizeof(CAData_t));
390     }
391     else
392     {
393         CAResponseInfo_t *ResInfo;
394         ResInfo = (CAResponseInfo_t *) OICMalloc(sizeof(CAResponseInfo_t));
395         if (ResInfo == NULL)
396         {
397             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
398             coap_delete_pdu(pdu);
399             return;
400         }
401         memset(ResInfo, 0, sizeof(CAResponseInfo_t));
402         CAGetResponseInfoFromPdu(pdu, ResInfo, uri);
403
404         if (NULL != ResInfo->info.options)
405         {
406             uint32_t i;
407             for (i = 0; i < ResInfo->info.numOptions; i++)
408             {
409                 OIC_LOG_V(DEBUG, TAG, "Response- optionID: %d", ResInfo->info.options[i].optionID);
410
411                 OIC_LOG_V(DEBUG, TAG, "Response- list: %s", ResInfo->info.options[i].optionData);
412             }
413             if (NULL != ResInfo->info.payload)
414             {
415                 OIC_LOG_V(DEBUG, TAG, "Response- payload: %s", ResInfo->info.payload);
416             } OIC_LOG_V(DEBUG, TAG, "Response- code: %d", ResInfo->result);
417         }
418
419         if (NULL != endpoint)
420         {
421             endpoint->resourceUri = (char *) OICMalloc(strlen(uri) + 1);
422             if (endpoint->resourceUri == NULL)
423             {
424                 OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
425                 OICFree(ResInfo);
426                 coap_delete_pdu(pdu);
427                 return;
428             }
429             memset(endpoint->resourceUri, 0, strlen(uri) + 1);
430             memcpy(endpoint->resourceUri, uri, strlen(uri));
431             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
432         }
433
434         // store the data at queue.
435         CAData_t *cadata = NULL;
436         cadata = (CAData_t *) OICMalloc(sizeof(CAData_t));
437         if (cadata == NULL)
438         {
439             OIC_LOG(DEBUG, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
440             if (endpoint != NULL && endpoint->resourceUri != NULL)
441                 OICFree(endpoint->resourceUri);
442             OICFree(ResInfo);
443             coap_delete_pdu(pdu);
444             return;
445         }
446         memset(cadata, 0, sizeof(CAData_t));
447
448         cadata->type = SEND_TYPE_UNICAST;
449         cadata->remoteEndpoint = endpoint;
450         cadata->requestInfo = NULL;
451         cadata->responseInfo = ResInfo;
452
453         CAQueueingThreadAddData(&gReceiveThread, cadata, sizeof(CAData_t));
454
455         // for retransmission
456         CARetransmissionReceivedData(&gRetransmissionContext, endpoint, pdu->hdr, pdu->length);
457     }
458
459     if(pdu)
460     {
461         coap_delete_pdu(pdu);
462     }
463 }
464
465 static void CANetworkChangedCallback(CALocalConnectivity_t *info, CANetworkStatus_t status)
466 {
467     OIC_LOG(DEBUG, TAG, "networkChangeCallback in message handler!!");
468
469     OIC_LOG_V(DEBUG, TAG, "Changed Network Status: %d", status);
470 }
471
472 void CAHandleRequestResponseCallbacks()
473 {
474     OIC_LOG_V(DEBUG, TAG, "CAHandleRequestResponseCallbacks IN");
475
476 #ifdef SINGLE_HANDLE
477     // parse the data and call the callbacks.
478     // #1 parse the data
479     // #2 get endpoint
480
481     u_mutex_lock(gReceiveThread.threadMutex);
482
483     u_queue_message_t *item = u_queue_get_element(gReceiveThread.dataQueue);
484
485     u_mutex_unlock(gReceiveThread.threadMutex);
486
487     if (item == NULL)
488         return;
489
490     // get values
491     void *msg = item->msg;
492
493     if (msg == NULL)
494         return;
495
496     // get endpoint
497     CAData_t *td = (CAData_t *) msg;
498     CARemoteEndpoint_t *rep = td->remoteEndpoint;
499
500     if (rep == NULL)
501         return;
502
503     if (td->requestInfo != NULL)
504     {
505         if (gRequestHandler)
506         {
507             gRequestHandler(rep, td->requestInfo);
508         }
509
510         if (NULL != td->requestInfo->info.options)
511         {
512             OICFree(td->requestInfo->info.options);
513         }
514
515         if (NULL != td->requestInfo->info.payload)
516         {
517             OICFree(td->requestInfo->info.payload);
518         }
519
520         if (NULL != td->requestInfo->info.token)
521         {
522             OICFree(td->requestInfo->info.token);
523         }
524         OICFree(td->requestInfo);
525     }
526
527     if (td->responseInfo != NULL)
528     {
529         if (gResponseHandler)
530         {
531             gResponseHandler(rep, td->responseInfo);
532         }
533
534         if (NULL != td->responseInfo->info.options)
535         {
536             OICFree(td->responseInfo->info.options);
537         }
538
539         if (NULL != td->responseInfo->info.payload)
540         {
541             OICFree(td->responseInfo->info.payload);
542         }
543
544         if (NULL != td->responseInfo->info.token)
545         {
546             OICFree(td->responseInfo->info.token);
547         }
548         OICFree(td->responseInfo);
549     }
550
551     if (NULL != rep->resourceUri)
552     {
553         OICFree(rep->resourceUri);
554     }
555
556     OICFree(rep);
557
558
559 #endif
560     OIC_LOG_V(DEBUG, TAG, "CAHandleRequestResponseCallbacks OUT");
561 }
562
563 CAResult_t CADetachRequestMessage(const CARemoteEndpoint_t *object,
564                                   const CARequestInfo_t *request)
565 {
566     OIC_LOG_V(DEBUG, TAG, "CADetachRequestMessage");
567
568     if (object == NULL || request == NULL)
569     {
570         return CA_STATUS_FAILED;
571     }
572
573     CARemoteEndpoint_t *remoteEndpoint = NULL;
574     CARequestInfo_t *requestInfo = NULL;
575
576     CAData_t *data = (CAData_t *) OICMalloc(sizeof(CAData_t));
577     MEMORY_ALLOC_CHECK(data);
578
579     // initialize
580     memset(data, 0, sizeof(CAData_t));
581
582     // clone remote endpoint
583
584     remoteEndpoint = CACloneRemoteEndpoint(object);
585     MEMORY_ALLOC_CHECK(remoteEndpoint);
586
587     // clone request info
588     requestInfo = CACloneRequestInfo(request);
589     MEMORY_ALLOC_CHECK(requestInfo);
590
591     // save data
592     data->type = SEND_TYPE_UNICAST;
593     data->remoteEndpoint = remoteEndpoint;
594     data->requestInfo = requestInfo;
595     data->responseInfo = NULL;
596
597     // add thread
598     CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
599
600     return CA_STATUS_OK;
601
602     // memory error label.
603 memory_error_exit:
604
605     CADestroyRemoteEndpointInternal(remoteEndpoint);
606
607     CADestroyRequestInfoInternal(requestInfo);
608
609     if (data != NULL)
610     {
611         OICFree(data);
612     }
613
614     return CA_MEMORY_ALLOC_FAILED;
615 }
616
617 CAResult_t CADetachRequestToAllMessage(const CAGroupEndpoint_t *object,
618                                        const CARequestInfo_t *request)
619 {
620     // ToDo
621     OIC_LOG_V(DEBUG, TAG, "CADetachRequestToAllMessage");
622
623
624     if (object == NULL || request == NULL)
625     {
626         return CA_STATUS_FAILED;
627     }
628
629     CARemoteEndpoint_t *remoteEndpoint = NULL;
630     CARequestInfo_t *requestInfo = NULL;
631
632     CAData_t *data = (CAData_t *) OICMalloc(sizeof(CAData_t));
633     MEMORY_ALLOC_CHECK(data);
634
635     // initialize
636     memset(data, 0, sizeof(CAData_t));
637
638     CAAddress_t addr;
639     memset(&addr, 0, sizeof(CAAddress_t));
640     remoteEndpoint = CACreateRemoteEndpointInternal(object->resourceUri, addr,
641                                          object->connectivityType);
642
643     // clone request info
644     requestInfo = CACloneRequestInfo(request);
645     MEMORY_ALLOC_CHECK(requestInfo);
646
647     // save data
648     data->type = SEND_TYPE_MULTICAST;
649     data->remoteEndpoint = remoteEndpoint;
650     data->requestInfo = requestInfo;
651     data->responseInfo = NULL;
652
653     // add thread
654     CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
655
656     return CA_STATUS_OK;
657
658     // memory error label.
659 memory_error_exit:
660
661     CADestroyRemoteEndpointInternal(remoteEndpoint);
662
663     CADestroyRequestInfoInternal(requestInfo);
664
665     if (data != NULL)
666     {
667         OICFree(data);
668     }
669
670     return CA_MEMORY_ALLOC_FAILED;
671 }
672
673 CAResult_t CADetachResponseMessage(const CARemoteEndpoint_t *object,
674                                    const CAResponseInfo_t *response)
675 {
676     OIC_LOG_V(DEBUG, TAG, "CADetachResponseMessage");
677
678     if (object == NULL || response == NULL)
679     {
680         return CA_STATUS_FAILED;
681     }
682
683     CARemoteEndpoint_t *remoteEndpoint = NULL;
684     CAResponseInfo_t *responseInfo = NULL;
685
686     CAData_t *data = (CAData_t *) OICMalloc(sizeof(CAData_t));
687     MEMORY_ALLOC_CHECK(data);
688
689     // initialize
690     memset(data, 0, sizeof(CAData_t));
691
692     // clone remote endpoint
693     remoteEndpoint = CACloneRemoteEndpoint(object);
694     MEMORY_ALLOC_CHECK(remoteEndpoint);
695
696     // clone response info
697     responseInfo = CACloneResponseInfo(response);
698     MEMORY_ALLOC_CHECK(responseInfo);
699
700     // save data
701     data->type = SEND_TYPE_UNICAST;
702     data->remoteEndpoint = remoteEndpoint;
703     data->requestInfo = NULL;
704     data->responseInfo = responseInfo;
705
706     // add thread
707     CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
708
709     return CA_STATUS_OK;
710
711     // memory error label.
712 memory_error_exit:
713
714     CADestroyRemoteEndpointInternal(remoteEndpoint);
715
716     CADestroyResponseInfoInternal(responseInfo);
717
718     if (data != NULL)
719     {
720         OICFree(data);
721     }
722
723     return CA_MEMORY_ALLOC_FAILED;
724 }
725
726 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
727                                       const CAHeaderOption_t *options, uint8_t numOptions)
728 {
729     if (resourceUri == NULL)
730     {
731         return CA_STATUS_FAILED;
732     }
733     CARequestInfo_t *ReqInfo = NULL;
734     CAToken_t tempToken = NULL;
735     CARemoteEndpoint_t *remoteEndpoint = NULL;
736
737     CAData_t *data = (CAData_t *) OICMalloc(sizeof(CAData_t));
738     MEMORY_ALLOC_CHECK(data);
739
740     // initialize
741     memset(data, 0, sizeof(CAData_t));
742
743     CAAddress_t addr;
744     memset(&addr, 0, sizeof(CAAddress_t));
745     remoteEndpoint = CACreateRemoteEndpointInternal(resourceUri, addr,
746                                          CA_ETHERNET | CA_WIFI | CA_EDR | CA_LE);
747
748     // create request info
749     ReqInfo = (CARequestInfo_t *) OICMalloc(sizeof(CARequestInfo_t));
750     MEMORY_ALLOC_CHECK(ReqInfo);
751     memset(ReqInfo, 0, sizeof(CARequestInfo_t));
752
753     // copy token value
754     if (token != NULL)
755     {
756         int32_t len = strlen(token);
757         tempToken = (char *) OICMalloc(sizeof(char) * (len + 1));
758         MEMORY_ALLOC_CHECK(tempToken);
759         memset(tempToken, 0, sizeof(char) * (len + 1));
760         strncpy(tempToken, token, len);
761     }
762
763     // save request info data
764     ReqInfo->method = CA_GET;
765     ReqInfo->info.token = tempToken;
766     ReqInfo->info.type = CA_MSG_NONCONFIRM;
767     // save data
768     data->type = SEND_TYPE_MULTICAST;
769     data->remoteEndpoint = remoteEndpoint;
770     data->requestInfo = ReqInfo;
771
772     data->responseInfo = NULL;
773     data->options = NULL;
774     data->numOptions = 0;
775
776     if (options != NULL && numOptions > 0)
777     {
778         // copy data
779         CAHeaderOption_t *temp = (CAHeaderOption_t *) OICMalloc(
780                                      sizeof(CAHeaderOption_t) * numOptions);
781
782         MEMORY_ALLOC_CHECK(temp);
783
784         memset(temp, 0, sizeof(CAHeaderOption_t) * numOptions);
785         memcpy(temp, options, sizeof(CAHeaderOption_t) * numOptions);
786
787         data->options = temp;
788         data->numOptions = numOptions;
789     }
790
791     // add thread
792     CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
793
794     return CA_STATUS_OK;
795
796     // memory error label.
797 memory_error_exit:
798
799     CADestroyRemoteEndpointInternal(remoteEndpoint);
800
801     if (tempToken != NULL)
802     {
803         OICFree(tempToken);
804     }
805
806     if (ReqInfo != NULL)
807     {
808         OICFree(ReqInfo);
809     }
810
811     if (data != NULL)
812     {
813         OICFree(data);
814     }
815
816     return CA_MEMORY_ALLOC_FAILED;
817 }
818
819 void CASetRequestResponseCallbacks(CARequestCallback ReqHandler,
820                                    CAResponseCallback RespHandler)
821 {
822     OIC_LOG_V(DEBUG, TAG, "set request, response handler callback.");
823
824     gRequestHandler = ReqHandler;
825     gResponseHandler = RespHandler;
826 }
827
828 CAResult_t CAInitializeMessageHandler()
829 {
830     OIC_LOG(DEBUG, TAG, "CAInitializeMessageHandler - Entry");
831     CASetPacketReceivedCallback(CAReceivedPacketCallback);
832
833     CASetNetworkChangeCallback(CANetworkChangedCallback);
834
835     // create thread pool
836     CAResult_t res;
837     res = u_thread_pool_init(MAX_THREAD_POOL_SIZE, &gThreadPoolHandle);
838
839     if (res != CA_STATUS_OK)
840     {
841         OIC_LOG_V(DEBUG, TAG, "thread pool initialize error.");
842         return res;
843     }
844
845     // send thread initialize
846     CAQueueingThreadInitialize(&gSendThread, gThreadPoolHandle, CASendThreadProcess,
847             CADataDestroyer);
848
849     // start send thread
850     res = CAQueueingThreadStart(&gSendThread);
851
852     if (res != CA_STATUS_OK)
853     {
854         OIC_LOG_V(DEBUG, TAG, "thread start error(send thread).");
855         return res;
856     }
857
858     // receive thread initialize
859     CAQueueingThreadInitialize(&gReceiveThread, gThreadPoolHandle, CAReceiveThreadProcess,
860             CADataDestroyer);
861
862 #ifndef SINGLE_HANDLE// This will be enabled when RI supports multi threading
863     // start receive thread
864     res = CAQueueingThreadStart(&gReceiveThread);
865
866     if (res != CA_STATUS_OK)
867     {
868         OIC_LOG_V(DEBUG, TAG, "thread start error(receive thread).");
869         return res;
870     }
871 #endif
872
873     // retransmission initialize
874     CARetransmissionInitialize(&gRetransmissionContext, gThreadPoolHandle, CASendUnicastData,
875             CATimeoutCallback, NULL);
876
877     // start retransmission
878     res = CARetransmissionStart(&gRetransmissionContext);
879
880     if (res != CA_STATUS_OK)
881     {
882         OIC_LOG_V(DEBUG, TAG, "thread start error(retransmission thread).");
883         return res;
884     }
885
886     // initialize interface adapters by controller
887     CAInitializeAdapters(gThreadPoolHandle);
888
889     return CA_STATUS_OK;
890 }
891
892 void CATerminateMessageHandler()
893 {
894     // terminate interface adapters by controller
895     CATerminateAdapters();
896
897     // stop retransmission
898     if(gRetransmissionContext.threadMutex != NULL) {
899         CARetransmissionStop(&gRetransmissionContext);
900         CARetransmissionDestroy(&gRetransmissionContext);
901     }
902
903     // stop thread
904     // delete thread data
905     if(gSendThread.threadMutex != NULL) {
906         CAQueueingThreadStop(&gSendThread);
907         CAQueueingThreadDestroy(&gSendThread);
908     }
909
910     // stop thread
911     // delete thread data
912     if(gReceiveThread.threadMutex != NULL) {
913         #ifndef SINGLE_HANDLE// This will be enabled when RI supports multi threading
914         // CAQueueingThreadStop(&gReceiveThread);
915         #endif  
916         CAQueueingThreadDestroy(&gReceiveThread);
917     }
918
919     // destroy thread pool
920     if(gThreadPoolHandle != NULL) {
921         u_thread_pool_free(gThreadPoolHandle);
922         gThreadPoolHandle = NULL;
923     }
924
925     OIC_LOG_V(DEBUG, TAG, "message handler termination is complete!");
926 }
927