1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "cainterfacecontroller.h"
32 #include "config.h" /* for coap protocol */
34 #include "uthreadpool.h" /* for thread pool */
36 #include "oic_malloc.h"
40 #define MEMORY_ALLOCK_CHECK(arg) { if (arg == NULL) {OIC_LOG_V(DEBUG, TAG, "memory error"); goto memory_error_exit;} }
41 #define MAX_ACTION_NUM 300
43 #define MAX_THREAD_POOL_SIZE 10
56 CARemoteEndpoint_t* remoteEndpoint;
57 CARequestInfo_t* requestInfo;
58 CAResponseInfo_t* responseInfo;
61 typedef void (*CAThreadTask)(CAData_t* data);
67 CAThreadTask threadTask;
73 static u_thread_pool_t gThreadPoolHandle = NULL;
75 // message handler main thread
76 static CAThread_t gSendThread;
78 // message handler callback
79 static int32_t gCurrentActionId = 0;
80 static CAMessageHandlerCallback gHandlerCallback = NULL;
83 static CARequestCallback gRequestHandler = NULL;
84 static CAResponseCallback gResponseHandler = NULL;
86 static u_queue_t* gMessageQueue = NULL;
87 static u_mutex gMessageQueueMutex = NULL;
89 static void CAAddReceiveData(CAData_t* data)
91 OIC_LOG_V(DEBUG, TAG, "CAAddReceiveData");
94 u_queue_message_t* message = (u_queue_message_t*) OICMalloc(sizeof(u_queue_message_t));
98 OIC_LOG_V(DEBUG, TAG, "memory error!!");
101 memset(message, 0, sizeof(u_queue_message_t));
104 message->size = sizeof(CAData_t);
107 u_mutex_lock(gMessageQueueMutex);
109 // add thread data into list
110 u_queue_add_element(gMessageQueue, message);
113 u_mutex_unlock(gMessageQueueMutex);
116 static void CAAddSendData(CAData_t* data)
118 OIC_LOG_V(DEBUG, TAG, "CAAddSendData!!");
120 // create thread data
121 u_queue_message_t* message = (u_queue_message_t*) OICMalloc(sizeof(u_queue_message_t));
125 OIC_LOG_V(DEBUG, TAG, "memory error!!");
128 memset(message, 0, sizeof(u_queue_message_t));
131 message->size = sizeof(CAData_t);
134 u_mutex_lock(gSendThread.threadMutex);
136 // add thread data into list
137 u_queue_add_element(gSendThread.dataQueue, message);
140 u_cond_signal(gSendThread.threadCond);
143 u_mutex_unlock(gSendThread.threadMutex);
146 static void CAStopSendThread()
148 OIC_LOG_V(DEBUG, TAG, "CAStopSendThread request!!");
151 u_mutex_lock(gSendThread.threadMutex);
154 gSendThread.isStop = TRUE;
157 u_cond_signal(gSendThread.threadCond);
160 u_mutex_unlock(gSendThread.threadMutex);
163 static void CASendThreadProcess(CAData_t* data)
167 OIC_LOG(DEBUG, TAG, "thread data error!!");
171 if (NULL == data->remoteEndpoint)
173 OIC_LOG(DEBUG, TAG, "remoteEndpoint is null");
177 OIC_LOG_V(DEBUG, TAG, "thread action id : %d", data->actionId);
179 CADetachErrorCode code = FAIL;
182 if (data->requestInfo != NULL)
184 OIC_LOG(DEBUG, TAG, "requestInfo is available");
186 coap_pdu_t* pdu = NULL;
187 pdu = CAGeneratePdu(data->remoteEndpoint->resourceUri, data->requestInfo->method,
188 data->requestInfo->info);
190 // interface controller function call.
193 OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
195 OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
197 OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %s", pdu->hdr);
199 res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
202 else if (data->responseInfo != NULL)
204 OIC_LOG_V(DEBUG, TAG, "responseInfo is available..");
206 coap_pdu_t* pdu = NULL;
208 pdu = CAGeneratePdu(data->remoteEndpoint->resourceUri, data->responseInfo->result,
209 data->responseInfo->info);
211 // interface controller function call.
214 OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
216 OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
218 OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %x", pdu->hdr);
220 res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
225 OIC_LOG(DEBUG, TAG, "both requestInfo & responseInfo is not available");
227 coap_pdu_t* pdu = NULL;
229 memset(&info, 0, sizeof(CAInfo_t));
230 pdu = CAGeneratePdu(data->remoteEndpoint->resourceUri, CA_GET, info);
234 OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
236 OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
238 OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
240 OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", pdu->hdr->id);
242 OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %x", pdu->hdr);
244 res = CASendMulticastData(pdu->hdr, pdu->length);
254 if (gHandlerCallback != NULL)
256 gHandlerCallback(data->actionId, code);
260 static void* CAThreadBaseRoutine(void* treadData)
262 OIC_LOG_V(DEBUG, TAG, "message handler main thread start..");
264 CAThread_t* thread = (CAThread_t*) treadData;
268 OIC_LOG_V(DEBUG, TAG, "thread data passing error!!");
273 while (!thread->isStop)
276 u_mutex_lock(thread->threadMutex);
278 // if queue is empty, thread will wait
279 if (u_queue_get_size(thread->dataQueue) <= 0)
281 OIC_LOG_V(DEBUG, TAG, "wait..");
283 u_cond_wait(thread->threadCond, thread->threadMutex);
285 OIC_LOG_V(DEBUG, TAG, "wake up..");
289 u_mutex_unlock(thread->threadMutex);
296 u_queue_message_t* message = u_queue_get_element(thread->dataQueue);
298 CAData_t* data = (CAData_t*) message->msg;
301 thread->threadTask(data);
304 OIC_LOG_V(DEBUG, TAG, "message handler main thread end..");
309 static int32_t CAIncreaseActionId()
313 gCurrentActionId = (gCurrentActionId > MAX_ACTION_NUM) ? 0 : gCurrentActionId;
315 return gCurrentActionId;
318 static void CAReceivedPacketCallback(CARemoteEndpoint_t* endpoint, void* data, uint32_t dataLen)
320 OIC_LOG(DEBUG, TAG, "receivedPacketCallback in message handler!!");
324 OIC_LOG(DEBUG, TAG, "received data is null");
329 uint32_t code = CA_NOT_FOUND;
330 pdu = CAParsePDU(data, &code);
332 if (code == CA_GET || code == CA_POST || code == CA_PUT || code == CA_DELETE)
334 CARequestInfo_t ReqInfo;
335 memset(&ReqInfo, 0, sizeof(CARequestInfo_t));
336 CAGetRequestInfoFromPdu(pdu, &ReqInfo);
338 if (NULL != ReqInfo.info.options && NULL != endpoint)
340 OIC_LOG_V(DEBUG, TAG, "Request PDU - optionID: %d", ReqInfo.info.options->optionID);
342 OIC_LOG_V(DEBUG, TAG, "Request PDU - optionlist: %s", ReqInfo.info.options->optionData);
344 OIC_LOG_V(DEBUG, TAG, "Request PDU - payload: %s", ReqInfo.info.payload);
346 OIC_LOG_V(DEBUG, TAG, "Request PDU - code: %d", ReqInfo.method);
348 endpoint->resourceUri = (char*) OICMalloc(strlen(ReqInfo.info.options->optionData) + 1);
349 memcpy(endpoint->resourceUri, ReqInfo.info.options->optionData,
350 strlen(ReqInfo.info.options->optionData));
351 OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
354 // store the data at queue.
355 CAData_t* cadata = NULL;
356 cadata = (CAData_t*) OICMalloc(sizeof(CAData_t));
357 memset(cadata, 0, sizeof(CAData_t));
358 cadata->actionId = 1;
359 cadata->remoteEndpoint = endpoint;
360 cadata->requestInfo = &ReqInfo;
361 cadata->responseInfo = NULL;
362 CAAddReceiveData(cadata);
367 CAResponseInfo_t ResInfo;
368 memset(&ResInfo, 0, sizeof(CARequestInfo_t));
369 CAGetResponseInfoFromPdu(pdu, &ResInfo);
371 if (NULL != ResInfo.info.options && NULL != endpoint)
373 OIC_LOG_V(DEBUG, TAG, "Response PDU - optionID: %d", ResInfo.info.options->optionID);
375 OIC_LOG_V(DEBUG, TAG, "Response PDU - optionlist: %s", ResInfo.info.options->optionData);
377 OIC_LOG_V(DEBUG, TAG, "Response PDU - payload: %s", ResInfo.info.payload);
379 OIC_LOG_V(DEBUG, TAG, "Response PDU - code: %d", ResInfo.result);
381 endpoint->resourceUri = (char*) OICMalloc(strlen(ResInfo.info.options->optionData) + 1);
382 memcpy(endpoint->resourceUri, ResInfo.info.options->optionData,
383 strlen(ResInfo.info.options->optionData));
384 OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
387 // store the data at queue.
388 CAData_t* cadata = NULL;
389 cadata = (CAData_t*) OICMalloc(sizeof(CAData_t));
390 memset(cadata, 0, sizeof(CAData_t));
391 cadata->actionId = 1;
392 cadata->remoteEndpoint = endpoint;
393 cadata->requestInfo = NULL;
394 cadata->responseInfo = &ResInfo;
395 CAAddReceiveData(cadata);
399 void CAHandleRequestResponseCallbacks()
401 OIC_LOG_V(DEBUG, TAG, "CAHandleRequestResponseCallbacks");
403 // parse the data and call the callbacks.
407 u_mutex_lock(gMessageQueueMutex);
409 u_queue_message_t* item = u_queue_get_element(gMessageQueue);
411 u_mutex_unlock(gMessageQueueMutex);
417 void* msg = item->msg;
423 CAData_t* td = (CAData_t*) msg;
425 CARemoteEndpoint_t* rep = td->remoteEndpoint;
430 if (td->requestInfo != NULL)
434 gRequestHandler(rep, NULL);
438 if (td->responseInfo != NULL)
440 if (gResponseHandler)
442 gResponseHandler(rep, NULL);
446 u_queue_remove_element(gMessageQueue);
449 int32_t CADetachRequestMessage(const CARemoteEndpoint_t* object, const CARequestInfo_t* request)
451 OIC_LOG_V(DEBUG, TAG, "CADetachRequestMessage");
453 if (object == NULL || request == NULL)
461 id = CAIncreaseActionId();
463 CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
464 MEMORY_ALLOCK_CHECK(data);
467 memset(data, 0, sizeof(CAData_t));
469 // clone remote endpoint
470 CARemoteEndpoint_t* remoteEndpoint = CACloneRemoteEndpoint(object);
471 MEMORY_ALLOCK_CHECK(remoteEndpoint);
473 // clone request info
474 CARequestInfo_t* requestInfo = CACloneRequestInfo(request);
475 MEMORY_ALLOCK_CHECK(requestInfo);
479 data->remoteEndpoint = remoteEndpoint;
480 data->requestInfo = requestInfo;
481 data->responseInfo = NULL;
488 // memory error label.
491 CADestroyRemoteEndpointInternal(remoteEndpoint);
493 CADestroyRequestInfoInternal(requestInfo);
503 int32_t CADetachResponseMessage(const CARemoteEndpoint_t* object, const CAResponseInfo_t* response)
505 OIC_LOG_V(DEBUG, TAG, "CADetachResponseMessage");
507 if (object == NULL || response == NULL)
515 id = CAIncreaseActionId();
517 CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
518 MEMORY_ALLOCK_CHECK(data);
521 memset(data, 0, sizeof(CAData_t));
523 // clone remote endpoint
524 CARemoteEndpoint_t* remoteEndpoint = CACloneRemoteEndpoint(object);
525 MEMORY_ALLOCK_CHECK(remoteEndpoint);
527 // clone response info
528 CAResponseInfo_t* responseInfo = CACloneResponseInfo(response);
529 MEMORY_ALLOCK_CHECK(responseInfo);
533 data->remoteEndpoint = remoteEndpoint;
534 data->requestInfo = NULL;
535 data->responseInfo = responseInfo;
542 // memory error label.
545 CADestroyRemoteEndpointInternal(remoteEndpoint);
547 CADestroyResponseInfoInternal(responseInfo);
557 int32_t CADetachMessageResourceUri(const CAURI_t resourceUri)
559 if (resourceUri == NULL)
567 id = CAIncreaseActionId();
569 CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
570 MEMORY_ALLOCK_CHECK(data);
573 memset(data, 0, sizeof(CAData_t));
576 memset(&addr, 0, sizeof(CAAddress_t));
577 CARemoteEndpoint_t* remoteEndpoint = CACreateRemoteEndpointInternal(resourceUri, addr,
578 CA_ETHERNET | CA_WIFI | CA_EDR | CA_LE);
582 data->remoteEndpoint = remoteEndpoint;
583 data->requestInfo = NULL;
584 data->responseInfo = NULL;
591 // memory error label.
594 CADestroyRemoteEndpointInternal(remoteEndpoint);
604 void CASetMessageHandlerCallback(CAMessageHandlerCallback callback)
606 OIC_LOG_V(DEBUG, TAG, "set message handler callback.");
608 gHandlerCallback = callback;
611 void CASetRequestResponseCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler)
613 OIC_LOG_V(DEBUG, TAG, "set request, response handler callback.");
615 gRequestHandler = ReqHandler;
616 gResponseHandler = RespHandler;
619 CAResult_t CAInitializeMessageHandler()
621 CASetPacketReceivedCallback(CAReceivedPacketCallback);
623 // create thread pool
625 res = u_thread_pool_init(MAX_THREAD_POOL_SIZE, &gThreadPoolHandle);
627 if (res != CA_STATUS_OK)
629 OIC_LOG_V(DEBUG, TAG, "thread pool initialize error.");
633 // send thread initialize
634 memset(&gSendThread, 0, sizeof(CAThread_t));
639 // set send thread data
640 gSendThread.dataQueue = u_queue_create();
641 gSendThread.threadMutex = u_mutex_new();
642 gSendThread.threadCond = u_cond_new();
643 gSendThread.isStop = FALSE;
644 gSendThread.threadTask = CASendThreadProcess;
647 res = u_thread_pool_add_task(gThreadPoolHandle, CAThreadBaseRoutine, &gSendThread);
649 if (res != CA_STATUS_OK)
651 OIC_LOG_V(DEBUG, TAG, "thread pool add task error.");
656 gMessageQueue = u_queue_create();
657 gMessageQueueMutex = u_mutex_new();
659 // initialize interface adapters by controller
660 CAInitializeAdapters();
665 void CATerminateMessageHandler()
667 // terminate interface adapters by controller
668 CATerminateAdapters();
673 // delete thread data
674 u_mutex_free(gSendThread.threadMutex);
675 u_cond_free(gSendThread.threadCond);
676 u_queue_delete(gSendThread.dataQueue);
678 // destroy thread pool
679 u_thread_pool_free(gThreadPoolHandle);
681 OIC_LOG_V(DEBUG, TAG, "message handler terminate completed!");
683 u_queue_delete(gMessageQueue);
684 u_mutex_free(gMessageQueueMutex);