Modifying version number for building on tizen 3.0
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / camessagehandler.c
1 /******************************************************************
2  *
3  * Copyright 2014 Samsung Electronics All Rights Reserved.
4  *
5  *
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  ******************************************************************/
20
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdint.h>
25
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "cainterfacecontroller.h"
30 #include "uqueue.h"
31 #include "logger.h"
32 #include "config.h" /* for coap protocol */
33 #include "coap.h"
34 #include "uthreadpool.h" /* for thread pool */
35 #include "umutex.h"
36 #include "oic_malloc.h"
37
38 #define TAG PCF("CA")
39
40 #define MEMORY_ALLOCK_CHECK(arg) { if (arg == NULL) {OIC_LOG_V(DEBUG, TAG, "memory error"); goto memory_error_exit;} }
41 #define MAX_ACTION_NUM   300
42
43 #define MAX_THREAD_POOL_SIZE    10
44
45 #ifndef TRUE
46 #define TRUE    1
47 #endif
48
49 #ifndef FALSE
50 #define FALSE   0
51 #endif
52
53 typedef struct
54 {
55     int32_t actionId;
56     CARemoteEndpoint_t* remoteEndpoint;
57     CARequestInfo_t* requestInfo;
58     CAResponseInfo_t* responseInfo;
59 } CAData_t;
60
61 typedef void (*CAThreadTask)(CAData_t* data);
62
63 typedef struct
64 {
65     u_mutex threadMutex;
66     u_cond threadCond;
67     CAThreadTask threadTask;
68     int32_t isStop;
69     u_queue_t* dataQueue;
70 } CAThread_t;
71
72 // thread pool handle
73 static u_thread_pool_t gThreadPoolHandle = NULL;
74
75 // message handler main thread
76 static CAThread_t gSendThread;
77
78 // message handler callback
79 static int32_t gCurrentActionId = 0;
80 static CAMessageHandlerCallback gHandlerCallback = NULL;
81
82 // handler field
83 static CARequestCallback gRequestHandler = NULL;
84 static CAResponseCallback gResponseHandler = NULL;
85
86 static u_queue_t* gMessageQueue = NULL;
87 static u_mutex gMessageQueueMutex = NULL;
88
89 static void CAAddReceiveData(CAData_t* data)
90 {
91     OIC_LOG_V(DEBUG, TAG, "CAAddReceiveData");
92
93     // create thread data
94     u_queue_message_t* message = (u_queue_message_t*) OICMalloc(sizeof(u_queue_message_t));
95
96     if (message == NULL)
97     {
98         OIC_LOG_V(DEBUG, TAG, "memory error!!");
99         return;
100     }
101     memset(message, 0, sizeof(u_queue_message_t));
102
103     message->msg = data;
104     message->size = sizeof(CAData_t);
105
106     // mutex lock
107     u_mutex_lock(gMessageQueueMutex);
108
109     // add thread data into list
110     u_queue_add_element(gMessageQueue, message);
111
112     // mutex unlock
113     u_mutex_unlock(gMessageQueueMutex);
114 }
115
116 static void CAAddSendData(CAData_t* data)
117 {
118     OIC_LOG_V(DEBUG, TAG, "CAAddSendData!!");
119
120     // create thread data
121     u_queue_message_t* message = (u_queue_message_t*) OICMalloc(sizeof(u_queue_message_t));
122
123     if (message == NULL)
124     {
125         OIC_LOG_V(DEBUG, TAG, "memory error!!");
126         return;
127     }
128     memset(message, 0, sizeof(u_queue_message_t));
129
130     message->msg = data;
131     message->size = sizeof(CAData_t);
132
133     // mutex lock
134     u_mutex_lock(gSendThread.threadMutex);
135
136     // add thread data into list
137     u_queue_add_element(gSendThread.dataQueue, message);
138
139     // notity the thread
140     u_cond_signal(gSendThread.threadCond);
141
142     // mutex unlock
143     u_mutex_unlock(gSendThread.threadMutex);
144 }
145
146 static void CAStopSendThread()
147 {
148     OIC_LOG_V(DEBUG, TAG, "CAStopSendThread request!!");
149
150     // mutex lock
151     u_mutex_lock(gSendThread.threadMutex);
152
153     // set stop flag
154     gSendThread.isStop = TRUE;
155
156     // notity the thread
157     u_cond_signal(gSendThread.threadCond);
158
159     // mutex unlock
160     u_mutex_unlock(gSendThread.threadMutex);
161 }
162
163 static void CASendThreadProcess(CAData_t* data)
164 {
165     if (data == NULL)
166     {
167         OIC_LOG(DEBUG, TAG, "thread data error!!");
168         return;
169     }
170
171     if (NULL == data->remoteEndpoint)
172     {
173         OIC_LOG(DEBUG, TAG, "remoteEndpoint is null");
174         return;
175     }
176
177     OIC_LOG_V(DEBUG, TAG, "thread action id : %d", data->actionId);
178
179     CADetachErrorCode code = FAIL;
180     int32_t res = 0;
181
182     if (data->requestInfo != NULL)
183     {
184         OIC_LOG(DEBUG, TAG, "requestInfo is available");
185
186         coap_pdu_t* pdu = NULL;
187         pdu = CAGeneratePdu(data->remoteEndpoint->resourceUri, data->requestInfo->method,
188                 data->requestInfo->info);
189
190         // interface controller function call.
191         if (NULL != pdu)
192         {
193             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
194
195             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
196
197             OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %s", pdu->hdr);
198
199             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
200         }
201     }
202     else if (data->responseInfo != NULL)
203     {
204         OIC_LOG_V(DEBUG, TAG, "responseInfo is available..");
205
206         coap_pdu_t* pdu = NULL;
207
208         pdu = CAGeneratePdu(data->remoteEndpoint->resourceUri, data->responseInfo->result,
209                 data->responseInfo->info);
210
211         // interface controller function call.
212         if (NULL != pdu)
213         {
214             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
215
216             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
217
218             OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %x", pdu->hdr);
219
220             res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
221         }
222     }
223     else
224     {
225         OIC_LOG(DEBUG, TAG, "both requestInfo & responseInfo is not available");
226
227         coap_pdu_t* pdu = NULL;
228         CAInfo_t info;
229         memset(&info, 0, sizeof(CAInfo_t));
230         pdu = CAGeneratePdu(data->remoteEndpoint->resourceUri, CA_GET, info);
231
232         if (NULL != pdu)
233         {
234             OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
235
236             OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
237
238             OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
239
240             OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", pdu->hdr->id);
241
242             OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %x", pdu->hdr);
243
244             res = CASendMulticastData(pdu->hdr, pdu->length);
245         }
246
247     }
248
249     if (res)
250     {
251         code = SUCCESS;
252     }
253
254     if (gHandlerCallback != NULL)
255     {
256         gHandlerCallback(data->actionId, code);
257     }
258 }
259
260 static void* CAThreadBaseRoutine(void* treadData)
261 {
262     OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
263
264     CAThread_t* thread = (CAThread_t*) treadData;
265
266     if (thread == NULL)
267     {
268         OIC_LOG_V(DEBUG, TAG, "thread data passing error!!");
269
270         return NULL;
271     }
272
273     while (!thread->isStop)
274     {
275         // mutex lock
276         u_mutex_lock(thread->threadMutex);
277
278         // if queue is empty, thread will wait
279         if (u_queue_get_size(thread->dataQueue) <= 0)
280         {
281             OIC_LOG_V(DEBUG, TAG, "wait..");
282             // wait
283             u_cond_wait(thread->threadCond, thread->threadMutex);
284
285             OIC_LOG_V(DEBUG, TAG, "wake up..");
286         }
287
288         // mutex unlock
289         u_mutex_unlock(thread->threadMutex);
290
291         // check stop flag
292         if (thread->isStop)
293             continue;
294
295         // get data
296         u_queue_message_t* message = u_queue_get_element(thread->dataQueue);
297
298         CAData_t* data = (CAData_t*) message->msg;
299
300         // process data
301         thread->threadTask(data);
302     }
303
304     OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
305
306     return NULL;
307 }
308
309 static int32_t CAIncreaseActionId()
310 {
311     ++gCurrentActionId;
312
313     gCurrentActionId = (gCurrentActionId > MAX_ACTION_NUM) ? 0 : gCurrentActionId;
314
315     return gCurrentActionId;
316 }
317
318 static void CAReceivedPacketCallback(CARemoteEndpoint_t* endpoint, void* data, uint32_t dataLen)
319 {
320     OIC_LOG(DEBUG, TAG, "receivedPacketCallback in message handler!!");
321
322     if (NULL == data)
323     {
324         OIC_LOG(DEBUG, TAG, "received data is null");
325         return;
326     }
327
328     coap_pdu_t* pdu;
329     uint32_t code = CA_NOT_FOUND;
330     pdu = CAParsePDU(data, &code);
331
332     if (code == CA_GET || code == CA_POST || code == CA_PUT || code == CA_DELETE)
333     {
334         CARequestInfo_t ReqInfo;
335         memset(&ReqInfo, 0, sizeof(CARequestInfo_t));
336         CAGetRequestInfoFromPdu(pdu, &ReqInfo);
337
338         if (NULL != ReqInfo.info.options && NULL != endpoint)
339         {
340             OIC_LOG_V(DEBUG, TAG, "Request PDU - optionID: %d", ReqInfo.info.options->optionID);
341
342             OIC_LOG_V(DEBUG, TAG, "Request PDU - optionlist: %s", ReqInfo.info.options->optionData);
343
344             OIC_LOG_V(DEBUG, TAG, "Request PDU  - payload: %s", ReqInfo.info.payload);
345
346             OIC_LOG_V(DEBUG, TAG, "Request PDU  - code: %d", ReqInfo.method);
347
348             endpoint->resourceUri = (char*) OICMalloc(strlen(ReqInfo.info.options->optionData) + 1);
349             memcpy(endpoint->resourceUri, ReqInfo.info.options->optionData,
350                     strlen(ReqInfo.info.options->optionData));
351             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
352         }
353
354         // store the data at queue.
355         CAData_t* cadata = NULL;
356         cadata = (CAData_t*) OICMalloc(sizeof(CAData_t));
357         memset(cadata, 0, sizeof(CAData_t));
358         cadata->actionId = 1;
359         cadata->remoteEndpoint = endpoint;
360         cadata->requestInfo = &ReqInfo;
361         cadata->responseInfo = NULL;
362         CAAddReceiveData(cadata);
363
364     }
365     else
366     {
367         CAResponseInfo_t ResInfo;
368         memset(&ResInfo, 0, sizeof(CARequestInfo_t));
369         CAGetResponseInfoFromPdu(pdu, &ResInfo);
370
371         if (NULL != ResInfo.info.options && NULL != endpoint)
372         {
373             OIC_LOG_V(DEBUG, TAG, "Response PDU - optionID: %d", ResInfo.info.options->optionID);
374
375             OIC_LOG_V(DEBUG, TAG, "Response PDU - optionlist: %s", ResInfo.info.options->optionData);
376
377             OIC_LOG_V(DEBUG, TAG, "Response PDU - payload: %s", ResInfo.info.payload);
378
379             OIC_LOG_V(DEBUG, TAG, "Response PDU - code: %d", ResInfo.result);
380
381             endpoint->resourceUri = (char*) OICMalloc(strlen(ResInfo.info.options->optionData) + 1);
382             memcpy(endpoint->resourceUri, ResInfo.info.options->optionData,
383                     strlen(ResInfo.info.options->optionData));
384             OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
385         }
386
387         // store the data at queue.
388         CAData_t* cadata = NULL;
389         cadata = (CAData_t*) OICMalloc(sizeof(CAData_t));
390         memset(cadata, 0, sizeof(CAData_t));
391         cadata->actionId = 1;
392         cadata->remoteEndpoint = endpoint;
393         cadata->requestInfo = NULL;
394         cadata->responseInfo = &ResInfo;
395         CAAddReceiveData(cadata);
396     }
397 }
398
399 void CAHandleRequestResponseCallbacks()
400 {
401     OIC_LOG_V(DEBUG, TAG, "CAHandleRequestResponseCallbacks");
402
403     // parse the data and call the callbacks.
404     // #1 parse the data
405     // #2 get endpoint
406
407     u_mutex_lock(gMessageQueueMutex);
408
409     u_queue_message_t* item = u_queue_get_element(gMessageQueue);
410
411     u_mutex_unlock(gMessageQueueMutex);
412
413     if (item == NULL)
414         return;
415
416     // get values
417     void* msg = item->msg;
418
419     if (msg == NULL)
420         return;
421
422     // get endpoint
423     CAData_t* td = (CAData_t*) msg;
424
425     CARemoteEndpoint_t* rep = td->remoteEndpoint;
426
427     if (rep == NULL)
428         return;
429
430     if (td->requestInfo != NULL)
431     {
432         if (gRequestHandler)
433         {
434             gRequestHandler(rep, NULL);
435         }
436     }
437
438     if (td->responseInfo != NULL)
439     {
440         if (gResponseHandler)
441         {
442             gResponseHandler(rep, NULL);
443         }
444     }
445
446     u_queue_remove_element(gMessageQueue);
447 }
448
449 int32_t CADetachRequestMessage(const CARemoteEndpoint_t* object, const CARequestInfo_t* request)
450 {
451     OIC_LOG_V(DEBUG, TAG, "CADetachRequestMessage");
452
453     if (object == NULL || request == NULL)
454     {
455         return -1;
456     }
457
458     int32_t id = 0;
459
460     // create action id
461     id = CAIncreaseActionId();
462
463     CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
464     MEMORY_ALLOCK_CHECK(data);
465
466     // initialize
467     memset(data, 0, sizeof(CAData_t));
468
469     // clone remote endpoint
470     CARemoteEndpoint_t* remoteEndpoint = CACloneRemoteEndpoint(object);
471     MEMORY_ALLOCK_CHECK(remoteEndpoint);
472
473     // clone request info
474     CARequestInfo_t* requestInfo = CACloneRequestInfo(request);
475     MEMORY_ALLOCK_CHECK(requestInfo);
476
477     // save data
478     data->actionId = id;
479     data->remoteEndpoint = remoteEndpoint;
480     data->requestInfo = requestInfo;
481     data->responseInfo = NULL;
482
483     // add thread
484     CAAddSendData(data);
485
486     return id;
487
488     // memory error label.
489     memory_error_exit:
490
491     CADestroyRemoteEndpointInternal(remoteEndpoint);
492
493     CADestroyRequestInfoInternal(requestInfo);
494
495     if (data != NULL)
496     {
497         OICFree(data);
498     }
499
500     return -1;
501 }
502
503 int32_t CADetachResponseMessage(const CARemoteEndpoint_t* object, const CAResponseInfo_t* response)
504 {
505     OIC_LOG_V(DEBUG, TAG, "CADetachResponseMessage");
506
507     if (object == NULL || response == NULL)
508     {
509         return -1;
510     }
511
512     int32_t id = 0;
513
514     // create action id
515     id = CAIncreaseActionId();
516
517     CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
518     MEMORY_ALLOCK_CHECK(data);
519
520     // initialize
521     memset(data, 0, sizeof(CAData_t));
522
523     // clone remote endpoint
524     CARemoteEndpoint_t* remoteEndpoint = CACloneRemoteEndpoint(object);
525     MEMORY_ALLOCK_CHECK(remoteEndpoint);
526
527     // clone response info
528     CAResponseInfo_t* responseInfo = CACloneResponseInfo(response);
529     MEMORY_ALLOCK_CHECK(responseInfo);
530
531     // save data
532     data->actionId = id;
533     data->remoteEndpoint = remoteEndpoint;
534     data->requestInfo = NULL;
535     data->responseInfo = responseInfo;
536
537     // add thread
538     CAAddSendData(data);
539
540     return id;
541
542     // memory error label.
543     memory_error_exit:
544
545     CADestroyRemoteEndpointInternal(remoteEndpoint);
546
547     CADestroyResponseInfoInternal(responseInfo);
548
549     if (data != NULL)
550     {
551         OICFree(data);
552     }
553
554     return -1;
555 }
556
557 int32_t CADetachMessageResourceUri(const CAURI_t resourceUri)
558 {
559     if (resourceUri == NULL)
560     {
561         return -1;
562     }
563
564     int32_t id = 0;
565
566     // create action id
567     id = CAIncreaseActionId();
568
569     CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
570     MEMORY_ALLOCK_CHECK(data);
571
572     // initialize
573     memset(data, 0, sizeof(CAData_t));
574
575     CAAddress_t addr;
576     memset(&addr, 0, sizeof(CAAddress_t));
577     CARemoteEndpoint_t* remoteEndpoint = CACreateRemoteEndpointInternal(resourceUri, addr,
578             CA_ETHERNET | CA_WIFI | CA_EDR | CA_LE);
579
580     // save data
581     data->actionId = id;
582     data->remoteEndpoint = remoteEndpoint;
583     data->requestInfo = NULL;
584     data->responseInfo = NULL;
585
586     // add thread
587     CAAddSendData(data);
588
589     return id;
590
591     // memory error label.
592     memory_error_exit:
593
594     CADestroyRemoteEndpointInternal(remoteEndpoint);
595
596     if (data != NULL)
597     {
598         OICFree(data);
599     }
600
601     return -1;
602 }
603
604 void CASetMessageHandlerCallback(CAMessageHandlerCallback callback)
605 {
606     OIC_LOG_V(DEBUG, TAG, "set message handler callback.");
607
608     gHandlerCallback = callback;
609 }
610
611 void CASetRequestResponseCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler)
612 {
613     OIC_LOG_V(DEBUG, TAG, "set request, response handler callback.");
614
615     gRequestHandler = ReqHandler;
616     gResponseHandler = RespHandler;
617 }
618
619 CAResult_t CAInitializeMessageHandler()
620 {
621     CASetPacketReceivedCallback(CAReceivedPacketCallback);
622
623     // create thread pool
624     CAResult_t res;
625     res = u_thread_pool_init(MAX_THREAD_POOL_SIZE, &gThreadPoolHandle);
626
627     if (res != CA_STATUS_OK)
628     {
629         OIC_LOG_V(DEBUG, TAG, "thread pool initialize error.");
630         return res;
631     }
632
633     // send thread initialize
634     memset(&gSendThread, 0, sizeof(CAThread_t));
635
636     // mutex init
637     u_mutex_init();
638
639     // set send thread data
640     gSendThread.dataQueue = u_queue_create();
641     gSendThread.threadMutex = u_mutex_new();
642     gSendThread.threadCond = u_cond_new();
643     gSendThread.isStop = FALSE;
644     gSendThread.threadTask = CASendThreadProcess;
645
646     // start send thread
647     res = u_thread_pool_add_task(gThreadPoolHandle, CAThreadBaseRoutine, &gSendThread);
648
649     if (res != CA_STATUS_OK)
650     {
651         OIC_LOG_V(DEBUG, TAG, "thread pool add task error.");
652         return res;
653     }
654
655     // set receive queue
656     gMessageQueue = u_queue_create();
657     gMessageQueueMutex = u_mutex_new();
658
659     // initialize interface adapters by controller
660     CAInitializeAdapters();
661
662     return CA_STATUS_OK;
663 }
664
665 void CATerminateMessageHandler()
666 {
667     // terminate interface adapters by controller
668     CATerminateAdapters();
669
670     // stop thread
671     CAStopSendThread();
672
673     // delete thread data
674     u_mutex_free(gSendThread.threadMutex);
675     u_cond_free(gSendThread.threadCond);
676     u_queue_delete(gSendThread.dataQueue);
677
678     // destroy thread pool
679     u_thread_pool_free(gThreadPoolHandle);
680
681     OIC_LOG_V(DEBUG, TAG, "message handler terminate completed!");
682
683     u_queue_delete(gMessageQueue);
684     u_mutex_free(gMessageQueueMutex);
685 }
686