Support HighQoS and Cancel Observe in CA and RI
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "cainterfacecontroller.h"
30 #include "caprotocolmessage.h"
31 #include "caretransmission.h"
32 #include "uqueue.h"
33 #include "logger.h"
34 #include "config.h" /* for coap protocol */
35 #include "uthreadpool.h" /* for thread pool */
36 #include "caqueueingthread.h"
37 #include "umutex.h"
38 #include "oic_malloc.h"
39 #include "canetworkconfigurator.h"
40
41 #define TAG PCF("CA")
42 #define SINGLE_HANDLE
43
44 #define MAX_THREAD_POOL_SIZE    20
45
46 typedef enum
47 {
48     SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST
49 } CASendDataType_t;
50
51 typedef struct
52 {
53     CASendDataType_t type;
54     CARemoteEndpoint_t *remoteEndpoint;
55     CARequestInfo_t *requestInfo;
56     CAResponseInfo_t *responseInfo;
57     CAHeaderOption_t *options;
58     uint8_t numOptions;
59 } CAData_t;
60
61 // thread pool handle
62 static u_thread_pool_t g_threadPoolHandle = NULL;
63
64 // message handler main thread
65 static CAQueueingThread_t g_sendThread;
66 static CAQueueingThread_t g_receiveThread;
67
68 static CARetransmission_t g_retransmissionContext;
69
70 // handler field
71 static CARequestCallback g_requestHandler = NULL;
72 static CAResponseCallback g_responseHandler = NULL;
73
74 static void CATimeoutCallback(const CARemoteEndpoint_t *endpoint, const void *pdu, uint32_t size)
75 {
76     OIC_LOG(DEBUG, TAG, "IN");
77     VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint");
78     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
79
80     CARemoteEndpoint_t* ep = CACloneRemoteEndpoint(endpoint);
81     if (NULL == ep)
82     {
83         OIC_LOG(ERROR, TAG, "clone failed");
84         return;
85     }
86
87     CAResponseInfo_t* resInfo = (CAResponseInfo_t*) OICCalloc(1, sizeof(CAResponseInfo_t));
88
89     if (NULL == resInfo)
90     {
91         OIC_LOG(ERROR, TAG, "calloc failed");
92         CADestroyRemoteEndpointInternal(ep);
93         return;
94     }
95
96     resInfo->result = CA_RETRANSMIT_TIMEOUT;
97     resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size);
98     resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size);
99     CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *) pdu, &(resInfo->info));
100     if (CA_STATUS_OK != res)
101     {
102         OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
103         OICFree(resInfo->info.token);
104         OICFree(resInfo);
105         CADestroyRemoteEndpointInternal(ep);
106         return;
107     }
108
109     CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
110     if (NULL == cadata)
111     {
112         OIC_LOG(ERROR, TAG, "memory allocation failed !");
113         CADestroyRemoteEndpointInternal(ep);
114         OICFree(resInfo);
115         return;
116     }
117
118     cadata->type = SEND_TYPE_UNICAST;
119     cadata->remoteEndpoint = ep;
120     cadata->requestInfo = NULL;
121     cadata->responseInfo = resInfo;
122
123     CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
124     OIC_LOG(DEBUG, TAG, "OUT");
125 }
126
127 static void CADataDestroyer(void *data, uint32_t size)
128 {
129     OIC_LOG(DEBUG, TAG, "IN");
130     CAData_t *cadata = (CAData_t *) data;
131
132     if (NULL == cadata)
133     {
134         OIC_LOG(ERROR, TAG, "cadata is NULL");
135         return;
136     }
137
138     if (NULL != cadata->remoteEndpoint)
139     {
140         CADestroyRemoteEndpointInternal((CARemoteEndpoint_t *) cadata->remoteEndpoint);
141     }
142
143     if (NULL != cadata->requestInfo)
144     {
145         CADestroyRequestInfoInternal((CARequestInfo_t *) cadata->requestInfo);
146     }
147
148     if (NULL != cadata->responseInfo)
149     {
150         CADestroyResponseInfoInternal((CAResponseInfo_t *) cadata->responseInfo);
151     }
152
153     OICFree(cadata->options);
154     OICFree(cadata);
155     OIC_LOG(DEBUG, TAG, "OUT");
156 }
157
158 static void CAReceiveThreadProcess(void *threadData)
159 {
160     OIC_LOG(DEBUG, TAG, "IN");
161     // Currently not supported
162     // This will be enabled when RI supports multi threading
163 #ifndef SINGLE_HANDLE
164     CAData_t *data = (CAData_t *) threadData;
165
166     if (NULL == data)
167     {
168         OIC_LOG(ERROR, TAG, "thread data error!!");
169         return;
170     }
171
172     // parse the data and call the callbacks.
173     // #1 parse the data
174     // #2 get endpoint
175     CARemoteEndpoint_t *rep = (CARemoteEndpoint_t *)(data->remoteEndpoint);
176
177     if (NULL == rep)
178     {
179         OIC_LOG(ERROR, TAG, "remoteEndpoint error!!");
180         return;
181     }
182
183     if (NULL != data->requestInfo)
184     {
185         if (g_requestHandler)
186         {
187             g_requestHandler(rep, data->requestInfo);
188         }
189     }
190
191     if (NULL != data->responseInfo)
192     {
193         if (g_responseHandler)
194         {
195             g_responseHandler(rep, data->responseInfo);
196         }
197     }
198 #endif
199     OIC_LOG(DEBUG, TAG, "OUT");
200 }
201
202 static void CASendThreadProcess(void *threadData)
203 {
204     OIC_LOG(DEBUG, TAG, "IN");
205     CAData_t *data = (CAData_t *) threadData;
206
207     VERIFY_NON_NULL_VOID(data, TAG, "data");
208     VERIFY_NON_NULL_VOID(data->remoteEndpoint, TAG, "remoteEndpoint");
209
210     CAResult_t res = CA_STATUS_FAILED;
211
212     CASendDataType_t type = data->type;
213
214     if (SEND_TYPE_UNICAST == type)
215     {
216         coap_pdu_t *pdu = NULL;
217
218         if (NULL != data->requestInfo)
219         {
220             OIC_LOG(DEBUG, TAG, "requestInfo is available..");
221
222             pdu = (coap_pdu_t *) CAGeneratePDU(data->remoteEndpoint->resourceUri,
223                                                data->requestInfo->method,
224                                                data->requestInfo->info);
225         }
226         else if (NULL != data->responseInfo)
227         {
228             OIC_LOG(DEBUG, TAG, "responseInfo is available..");
229
230             pdu = (coap_pdu_t *) CAGeneratePDU(data->remoteEndpoint->resourceUri,
231                                                data->responseInfo->result,
232                                                data->responseInfo->info);
233         }
234         else
235         {
236             OIC_LOG(DEBUG, TAG, "request info, response info is empty");
237         }
238
239         // interface controller function call.
240         if (NULL != pdu)
241         {
242             CALogPDUInfo(pdu);
243
244             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
245             if (CA_STATUS_OK != res)
246             {
247                 OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
248                 coap_delete_pdu(pdu);
249                 return;
250             }
251             // for retransmission
252             res = CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint, pdu->hdr,
253                                            pdu->length);
254             if (CA_STATUS_OK != res)
255             {
256                 OIC_LOG_V(INFO, TAG, "retransmission will be not working: %d", res);
257                 coap_delete_pdu(pdu);
258                 return;
259             }
260
261             coap_delete_pdu(pdu);
262         }
263     }
264     else if (SEND_TYPE_MULTICAST == type)
265     {
266         OIC_LOG(DEBUG, TAG, "both requestInfo & responseInfo is not available");
267
268         CAInfo_t info = { };
269
270         info.options = data->options;
271         info.numOptions = data->numOptions;
272         info.token = data->requestInfo->info.token;
273         info.tokenLength = data->requestInfo->info.tokenLength;
274         info.type = data->requestInfo->info.type;
275         info.messageId = data->requestInfo->info.messageId;
276         info.payload = data->requestInfo->info.payload;
277
278         coap_pdu_t *pdu = (coap_pdu_t *) CAGeneratePDU(data->remoteEndpoint->resourceUri, CA_GET,
279                                                        info);
280
281         if (NULL != pdu)
282         {
283             CALogPDUInfo(pdu);
284
285             res = CASendMulticastData(pdu->hdr, pdu->length);
286             if(CA_STATUS_OK != res)
287             {
288                 OIC_LOG_V(ERROR, TAG, "send failed:%d", res);
289                 coap_delete_pdu(pdu);
290                 return;
291             }
292
293             coap_delete_pdu(pdu);
294         }
295     }
296
297     OIC_LOG(DEBUG, TAG, "OUT");
298 }
299
300 static void CAReceivedPacketCallback(CARemoteEndpoint_t *endpoint, void *data, uint32_t dataLen)
301 {
302     OIC_LOG(DEBUG, TAG, "IN");
303     VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint");
304     VERIFY_NON_NULL_VOID(data, TAG, "data");
305
306     uint32_t code = CA_NOT_FOUND;
307     coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code);
308     OICFree(data);
309
310     if (NULL == pdu)
311     {
312         OIC_LOG(ERROR, TAG, "Parse PDU failed");
313         CAAdapterFreeRemoteEndpoint(endpoint);
314         return;
315     }
316
317     char uri[CA_MAX_URI_LENGTH] = { 0, };
318     uint32_t bufLen = sizeof(uri);
319
320     if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code)
321     {
322         CARequestInfo_t *ReqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t));
323         if (NULL == ReqInfo)
324         {
325             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!");
326             coap_delete_pdu(pdu);
327             CAAdapterFreeRemoteEndpoint(endpoint);
328             return;
329         }
330
331         CAResult_t res = CAGetRequestInfoFromPDU(pdu, ReqInfo, uri, bufLen);
332         if (CA_STATUS_OK != res)
333         {
334             OIC_LOG_V(ERROR, TAG, "CAGetRequestInfoFromPDU failed : %d", res);
335             OICFree(ReqInfo);
336             coap_delete_pdu(pdu);
337             CAAdapterFreeRemoteEndpoint(endpoint);
338             return;
339         }
340
341         if (NULL != ReqInfo->info.options)
342         {
343             uint32_t i;
344             for (i = 0; i < ReqInfo->info.numOptions; i++)
345             {
346                 OIC_LOG_V(DEBUG, TAG, "Request- optionID: %d", ReqInfo->info.options[i].optionID);
347
348                 OIC_LOG_V(DEBUG, TAG, "Request- list: %s", ReqInfo->info.options[i].optionData);
349             }
350         }
351
352         if (NULL != ReqInfo->info.payload)
353         {
354             OIC_LOG_V(DEBUG, TAG, "Request- payload: %s", ReqInfo->info.payload);
355         }
356         OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method);
357         if (NULL != ReqInfo->info.token)
358         {
359             OIC_LOG(DEBUG, TAG, "Request- token:");
360             OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) ReqInfo->info.token,
361                            ReqInfo->info.tokenLength);
362         }
363
364         OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method);
365         OIC_LOG(DEBUG, TAG, "Request- token");
366         OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) ReqInfo->info.token, CA_MAX_TOKEN_LEN);
367         OIC_LOG_V(DEBUG, TAG, "Request- msgID : %d", ReqInfo->info.messageId);
368         if (NULL != endpoint)
369         {
370             endpoint->resourceUri = (char *) OICMalloc(bufLen + 1);
371             if (NULL == endpoint->resourceUri)
372             {
373                 OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!");
374                 OICFree(ReqInfo);
375                 coap_delete_pdu(pdu);
376                 CAAdapterFreeRemoteEndpoint(endpoint);
377                 return;
378             }
379             memcpy(endpoint->resourceUri, uri, bufLen);
380             endpoint->resourceUri[bufLen] = '\0';
381             OIC_LOG_V(DEBUG, TAG, "URI : %s", endpoint->resourceUri);
382         }
383         // store the data at queue.
384         CAData_t *cadata = NULL;
385         cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
386         if (NULL == cadata)
387         {
388             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
389             if (NULL != endpoint && NULL != endpoint->resourceUri)
390             {
391                 OICFree(endpoint->resourceUri);
392             }
393
394             OICFree(ReqInfo);
395             coap_delete_pdu(pdu);
396             CAAdapterFreeRemoteEndpoint(endpoint);
397             return;
398         }
399
400         cadata->type = SEND_TYPE_UNICAST;
401         cadata->remoteEndpoint = endpoint;
402         cadata->requestInfo = ReqInfo;
403         cadata->responseInfo = NULL;
404         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
405     }
406     else
407     {
408         CAResponseInfo_t *ResInfo = (CAResponseInfo_t *) OICCalloc(1, sizeof(CAResponseInfo_t));
409         if (NULL == ResInfo)
410         {
411             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!");
412             coap_delete_pdu(pdu);
413             CAAdapterFreeRemoteEndpoint(endpoint);
414             return;
415         }
416
417         CAResult_t res = CAGetResponseInfoFromPDU(pdu, ResInfo, uri, bufLen);
418         if (CA_STATUS_OK != res)
419         {
420             OIC_LOG_V(ERROR, TAG, "CAGetResponseInfoFromPDU failed : %d", res);
421             OICFree(ResInfo);
422             coap_delete_pdu(pdu);
423             CAAdapterFreeRemoteEndpoint(endpoint);
424             return;
425         }
426
427         if (NULL != ResInfo->info.options)
428         {
429             uint32_t i;
430             for (i = 0; i < ResInfo->info.numOptions; i++)
431             {
432                 OIC_LOG_V(DEBUG, TAG, "Response- optionID: %d", ResInfo->info.options[i].optionID);
433
434                 OIC_LOG_V(DEBUG, TAG, "Response- list: %s", ResInfo->info.options[i].optionData);
435             }
436         }
437
438         if (NULL != ResInfo->info.payload)
439         {
440             OIC_LOG_V(DEBUG, TAG, "Response- payload: %s", ResInfo->info.payload);
441         }
442         OIC_LOG_V(DEBUG, TAG, "Response- code: %d", ResInfo->result);
443         OIC_LOG_V(DEBUG, TAG, "Response- token : %s", ResInfo->info.token);
444         OIC_LOG_V(DEBUG, TAG, "Response- msgID: %d", ResInfo->info.messageId);
445
446         if (NULL != endpoint)
447         {
448             endpoint->resourceUri = (char *) OICMalloc(bufLen + 1);
449             if (NULL == endpoint->resourceUri)
450             {
451                 OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
452                 OICFree(ResInfo);
453                 coap_delete_pdu(pdu);
454                 CAAdapterFreeRemoteEndpoint(endpoint);
455                 return;
456             }
457             memcpy(endpoint->resourceUri, uri, bufLen);
458             endpoint->resourceUri[bufLen] = '\0';
459             OIC_LOG_V(DEBUG, TAG, "URI : %s", endpoint->resourceUri);
460         }
461
462         // store the data at queue.
463         CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
464         if (NULL == cadata)
465         {
466             OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !");
467             if (NULL != endpoint && NULL != endpoint->resourceUri)
468             {
469                 OICFree(endpoint->resourceUri);
470             }
471             OICFree(ResInfo);
472             coap_delete_pdu(pdu);
473             CAAdapterFreeRemoteEndpoint(endpoint);
474             return;
475         }
476
477         cadata->type = SEND_TYPE_UNICAST;
478         cadata->remoteEndpoint = endpoint;
479         cadata->requestInfo = NULL;
480
481         // for retransmission
482         void *retransmissionPdu = NULL;
483         CARetransmissionReceivedData(&g_retransmissionContext, endpoint, pdu->hdr, pdu->length,
484                                      &retransmissionPdu);
485
486         // get token from saved data in retransmission list
487         if (retransmissionPdu && CA_EMPTY == code)
488         {
489             CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu,
490                                                &(ResInfo->info));
491             if (CA_STATUS_OK != res)
492             {
493                 OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list");
494                 OICFree(ResInfo->info.token);
495             }
496         }
497         OICFree(retransmissionPdu);
498         cadata->responseInfo = ResInfo;
499
500         CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
501     }
502
503     if (pdu)
504     {
505         coap_delete_pdu(pdu);
506     }
507     OIC_LOG(DEBUG, TAG, "OUT");
508 }
509
510 static void CANetworkChangedCallback(CALocalConnectivity_t *info, CANetworkStatus_t status)
511 {
512     OIC_LOG(DEBUG, TAG, "IN");
513
514     OIC_LOG(DEBUG, TAG, "OUT");
515 }
516
517 void CAHandleRequestResponseCallbacks()
518 {
519     OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks IN");
520
521 #ifdef SINGLE_HANDLE
522     // parse the data and call the callbacks.
523     // #1 parse the data
524     // #2 get endpoint
525
526     u_mutex_lock(g_receiveThread.threadMutex);
527
528     u_queue_message_t *item = u_queue_get_element(g_receiveThread.dataQueue);
529
530     u_mutex_unlock(g_receiveThread.threadMutex);
531
532     if (NULL == item)
533     {
534         return;
535     }
536
537     // get values
538     void *msg = item->msg;
539
540     if (NULL == msg)
541     {
542         return;
543     }
544
545     // get endpoint
546     CAData_t *td = (CAData_t *) msg;
547     CARemoteEndpoint_t *rep = td->remoteEndpoint;
548
549     if (NULL == rep)
550     {
551         return;
552     }
553
554     if (NULL != td->requestInfo)
555     {
556         if (g_requestHandler)
557         {
558             OIC_LOG_V(DEBUG, TAG, "callback will be sent : %d", td->requestInfo->info.numOptions);
559             g_requestHandler(rep, td->requestInfo);
560         }
561     }
562
563     if (NULL != td->responseInfo)
564     {
565         if (g_responseHandler)
566         {
567             g_responseHandler(rep, td->responseInfo);
568         }
569
570     }
571     CADataDestroyer(msg, sizeof(CAData_t));
572
573 #endif
574     OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks OUT");
575 }
576
577 CAResult_t CADetachRequestMessage(const CARemoteEndpoint_t *object, const CARequestInfo_t *request)
578 {
579     OIC_LOG(DEBUG, TAG, "IN");
580
581     VERIFY_NON_NULL(object, TAG, "object");
582     VERIFY_NON_NULL(request, TAG, "request");
583
584     CARemoteEndpoint_t *remoteEndpoint = NULL;
585     CARequestInfo_t *requestInfo = NULL;
586     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
587     CA_MEMORY_ALLOC_CHECK(data);
588
589     // clone remote endpoint
590     remoteEndpoint = CACloneRemoteEndpoint(object);
591     CA_MEMORY_ALLOC_CHECK(remoteEndpoint);
592
593     // clone request info
594     requestInfo = CACloneRequestInfo(request);
595     CA_MEMORY_ALLOC_CHECK(requestInfo);
596
597     // save data
598     data->type = SEND_TYPE_UNICAST;
599     data->remoteEndpoint = remoteEndpoint;
600     data->requestInfo = requestInfo;
601     data->responseInfo = NULL;
602
603     // add thread
604     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
605     OIC_LOG(DEBUG, TAG, "OUT");
606     return CA_STATUS_OK;
607
608 // memory error label.
609 memory_error_exit:
610     CADestroyRemoteEndpointInternal(remoteEndpoint);
611     CADestroyRequestInfoInternal(requestInfo);
612
613     OICFree(data);
614     OIC_LOG(DEBUG, TAG, "OUT");
615     return CA_MEMORY_ALLOC_FAILED;
616 }
617
618 CAResult_t CADetachRequestToAllMessage(const CAGroupEndpoint_t *object,
619                                        const CARequestInfo_t *request)
620 {
621     OIC_LOG(DEBUG, TAG, "IN");
622
623     if (NULL == object || NULL == request || NULL == object->resourceUri)
624     {
625         return CA_STATUS_INVALID_PARAM;
626     }
627
628     if ((request->method < CA_GET) || (request->method > CA_DELETE))
629     {
630         OIC_LOG(ERROR, TAG, "Invalid method type!");
631
632         return CA_STATUS_INVALID_PARAM;
633     }
634
635     CARemoteEndpoint_t *remoteEndpoint = NULL;
636     CARequestInfo_t *requestInfo = NULL;
637
638     // allocate & initialize
639     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
640     CA_MEMORY_ALLOC_CHECK(data);
641
642     CAAddress_t addr = {};
643     remoteEndpoint = CACreateRemoteEndpointInternal(object->resourceUri, addr,
644                                                                         object->connectivityType);
645
646     // clone request info
647     requestInfo = CACloneRequestInfo(request);
648     CA_MEMORY_ALLOC_CHECK(requestInfo);
649
650     // save data
651     data->type = SEND_TYPE_MULTICAST;
652     data->remoteEndpoint = remoteEndpoint;
653     data->requestInfo = requestInfo;
654     data->responseInfo = NULL;
655
656     // add thread
657     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
658
659     OIC_LOG(DEBUG, TAG, "OUT");
660     return CA_STATUS_OK;
661
662 // memory error label.
663 memory_error_exit:
664
665     CADestroyRequestInfoInternal(requestInfo);
666     CADestroyRemoteEndpointInternal(remoteEndpoint);
667     OICFree(data);
668     OIC_LOG(DEBUG, TAG, "OUT");
669     return CA_MEMORY_ALLOC_FAILED;
670 }
671
672 CAResult_t CADetachResponseMessage(const CARemoteEndpoint_t *object,
673                                    const CAResponseInfo_t *response)
674 {
675     OIC_LOG(DEBUG, TAG, "IN");
676     VERIFY_NON_NULL(object, TAG, "object");
677     VERIFY_NON_NULL(response, TAG, "response");
678
679     CARemoteEndpoint_t *remoteEndpoint = NULL;
680     CAResponseInfo_t *responseInfo = NULL;
681
682     // allocate & initialize
683     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
684     CA_MEMORY_ALLOC_CHECK(data);
685
686     // clone remote endpoint
687     remoteEndpoint = CACloneRemoteEndpoint(object);
688     CA_MEMORY_ALLOC_CHECK(remoteEndpoint);
689
690     // clone response info
691     responseInfo = CACloneResponseInfo(response);
692     CA_MEMORY_ALLOC_CHECK(responseInfo);
693
694     // save data
695     data->type = SEND_TYPE_UNICAST;
696     data->remoteEndpoint = remoteEndpoint;
697     data->requestInfo = NULL;
698     data->responseInfo = responseInfo;
699
700     // add thread
701     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
702
703     OIC_LOG(DEBUG, TAG, "OUT");
704     return CA_STATUS_OK;
705
706 // memory error label.
707 memory_error_exit:
708     CADestroyRemoteEndpointInternal(remoteEndpoint);
709     CADestroyResponseInfoInternal(responseInfo);
710     OICFree(data);
711     OIC_LOG(DEBUG, TAG, "OUT");
712
713     return CA_MEMORY_ALLOC_FAILED;
714 }
715
716 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token,
717                                       uint8_t tokenLength, const CAHeaderOption_t *options,
718                                       uint8_t numOptions)
719 {
720     OIC_LOG(DEBUG, TAG, "IN");
721     VERIFY_NON_NULL(resourceUri, TAG, "resourceUri is NULL");
722     VERIFY_NON_NULL(token, TAG, "Token is NULL");
723
724     CARemoteEndpoint_t *remoteEndpoint = NULL;
725     CARequestInfo_t *reqInfo = NULL;
726     char *tempToken = NULL;
727
728     // allocate & initialize
729     CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t));
730     CA_MEMORY_ALLOC_CHECK(data);
731
732     CAAddress_t addr = {};
733     remoteEndpoint = CACreateRemoteEndpointInternal(resourceUri, addr,
734                                                     CA_ETHERNET | CA_WIFI | CA_EDR | CA_LE);
735
736     // create request info
737     reqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t));
738     CA_MEMORY_ALLOC_CHECK(reqInfo);
739
740     if (tokenLength)
741     {
742         // copy token value
743         tempToken = (char *) OICMalloc(tokenLength);
744         CA_MEMORY_ALLOC_CHECK(tempToken);
745         memcpy(tempToken, token, tokenLength);
746     }
747
748     // save request info data
749     reqInfo->method = CA_GET;
750     reqInfo->info.type = CA_MSG_NONCONFIRM;
751
752     reqInfo->info.token = tempToken;
753     reqInfo->info.tokenLength = tokenLength;
754
755     // save data
756     data->type = SEND_TYPE_MULTICAST;
757     data->remoteEndpoint = remoteEndpoint;
758     data->requestInfo = reqInfo;
759
760     data->responseInfo = NULL;
761     data->options = NULL;
762     data->numOptions = 0;
763     if (NULL != options && 0 < numOptions)
764     {
765         // copy data
766         CAHeaderOption_t *headerOption = (CAHeaderOption_t *) OICMalloc(sizeof(CAHeaderOption_t)
767                                                                         * numOptions);
768         CA_MEMORY_ALLOC_CHECK(headerOption);
769
770         memcpy(headerOption, options, sizeof(CAHeaderOption_t) * numOptions);
771
772         data->options = headerOption;
773         data->numOptions = numOptions;
774     }
775
776     // add thread
777     CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
778
779     OIC_LOG(DEBUG, TAG, "OUT");
780     return CA_STATUS_OK;
781
782 // memory error label.
783 memory_error_exit:
784
785     CADestroyRemoteEndpointInternal(remoteEndpoint);
786
787     OICFree(tempToken);
788     OICFree(reqInfo);
789     OICFree(data);
790     OIC_LOG(DEBUG, TAG, "OUT");
791     return CA_MEMORY_ALLOC_FAILED;
792 }
793
794 void CASetRequestResponseCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler)
795 {
796     OIC_LOG(DEBUG, TAG, "IN");
797     g_requestHandler = ReqHandler;
798     g_responseHandler = RespHandler;
799     OIC_LOG(DEBUG, TAG, "OUT");
800 }
801
802 CAResult_t CAInitializeMessageHandler()
803 {
804     OIC_LOG(DEBUG, TAG, "IN");
805     CASetPacketReceivedCallback(CAReceivedPacketCallback);
806
807     CASetNetworkChangeCallback(CANetworkChangedCallback);
808
809     // create thread pool
810     CAResult_t res = u_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle);
811
812     if (res != CA_STATUS_OK)
813     {
814         OIC_LOG(ERROR, TAG, "thread pool initialize error.");
815         return res;
816     }
817
818     // send thread initialize
819     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle,
820                                                    CASendThreadProcess, CADataDestroyer))
821     {
822         OIC_LOG(ERROR, TAG, "Failed to Initialize send queue thread");
823         return CA_STATUS_FAILED;
824     }
825
826     // start send thread
827     res = CAQueueingThreadStart(&g_sendThread);
828
829     if (res != CA_STATUS_OK)
830     {
831         OIC_LOG(ERROR, TAG, "thread start error(send thread).");
832         u_thread_pool_free(g_threadPoolHandle);
833         g_threadPoolHandle = NULL;
834         return res;
835     }
836
837     // receive thread initialize
838     if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle,
839                                                    CAReceiveThreadProcess, CADataDestroyer))
840     {
841         OIC_LOG(ERROR, TAG, "Failed to Initialize receive queue thread");
842         return CA_STATUS_FAILED;
843     }
844
845 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
846     // start receive thread
847     res = CAQueueingThreadStart(&gReceiveThread);
848
849     if (res != CA_STATUS_OK)
850     {
851         OIC_LOG(ERROR, TAG, "thread start error(receive thread).");
852         return res;
853     }
854 #endif
855
856     // retransmission initialize
857     CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData,
858                                CATimeoutCallback, NULL);
859
860     // start retransmission
861     res = CARetransmissionStart(&g_retransmissionContext);
862
863     if (res != CA_STATUS_OK)
864     {
865         OIC_LOG(ERROR, TAG, "thread start error(retransmission thread).");
866         return res;
867     }
868
869     // initialize interface adapters by controller
870     CAInitializeAdapters(g_threadPoolHandle);
871     OIC_LOG(DEBUG, TAG, "OUT");
872     return CA_STATUS_OK;
873 }
874
875 void CATerminateMessageHandler()
876 {
877     OIC_LOG(DEBUG, TAG, "IN");
878     CAConnectivityType_t connType;
879     u_arraylist_t *list = CAGetSelectedNetworkList();
880     uint32_t length = u_arraylist_length(list);
881
882     uint32_t i = 0;
883     for (i = 0; i < length; i++)
884     {
885         void* ptrType = u_arraylist_get(list, i);
886
887         if (NULL == ptrType)
888         {
889             continue;
890         }
891
892         connType = *(CAConnectivityType_t *) ptrType;
893         CAStopAdapter(connType);
894     }
895
896     // stop retransmission
897     if (NULL != g_retransmissionContext.threadMutex)
898     {
899         CARetransmissionStop(&g_retransmissionContext);
900     }
901
902     // stop thread
903     // delete thread data
904     if (NULL != g_sendThread.threadMutex)
905     {
906         CAQueueingThreadStop(&g_sendThread);
907     }
908
909     // stop thread
910     // delete thread data
911     if (NULL != g_receiveThread.threadMutex)
912     {
913 #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading
914         CAQueueingThreadStop(&gReceiveThread);
915 #endif
916     }
917
918     // destroy thread pool
919     if (NULL != g_threadPoolHandle)
920     {
921         u_thread_pool_free(g_threadPoolHandle);
922         g_threadPoolHandle = NULL;
923     }
924
925     CARetransmissionDestroy(&g_retransmissionContext);
926     CAQueueingThreadDestroy(&g_sendThread);
927     CAQueueingThreadDestroy(&g_receiveThread);
928
929     // terminate interface adapters by controller
930     CATerminateAdapters();
931
932     OIC_LOG(DEBUG, TAG, "OUT");
933 }
934
935 void CALogPDUInfo(coap_pdu_t *pdu)
936 {
937     VERIFY_NON_NULL_VOID(pdu, TAG, "pdu");
938
939     OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
940
941     OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
942
943     OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
944
945     OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id));
946
947     OIC_LOG(DEBUG, TAG, "PDU Maker - token :");
948
949     OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->token, pdu->hdr->token_length);
950 }