1 /******************************************************************
3 * Copyright 2014 Samsung Electronics All Rights Reserved.
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 ******************************************************************/
26 #include "cainterface.h"
27 #include "camessagehandler.h"
28 #include "caremotehandler.h"
29 #include "cainterfacecontroller.h"
30 #include "caprotocolmessage.h"
33 #include "config.h" /* for coap protocol */
35 #include "uthreadpool.h" /* for thread pool */
36 #include "caqueueingthread.h"
38 #include "oic_malloc.h"
42 #define MEMORY_ALLOCK_CHECK(arg) { if (arg == NULL) {OIC_LOG_V(DEBUG, TAG, "memory error"); goto memory_error_exit;} }
44 #define MAX_THREAD_POOL_SIZE 10
48 SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST
53 CASendDataType_t type;
54 CARemoteEndpoint_t* remoteEndpoint;
55 CARequestInfo_t* requestInfo;
56 CAResponseInfo_t* responseInfo;
57 CAHeaderOption_t* options;
62 static u_thread_pool_t gThreadPoolHandle = NULL;
64 // message handler main thread
65 static CAQueueingThread_t gSendThread;
66 static CAQueueingThread_t gReceiveThread;
69 static CARequestCallback gRequestHandler = NULL;
70 static CAResponseCallback gResponseHandler = NULL;
72 static void CAReceiveThreadProcess(void* threadData)
75 // currently not support.
76 // CAHandleRequestResponseCallbacks codes will move to this function.
79 static void CASendThreadProcess(void* threadData)
81 CAData_t* data = (CAData_t*) threadData;
85 OIC_LOG(DEBUG, TAG, "thread data error!!");
89 if (NULL == data->remoteEndpoint)
91 OIC_LOG(DEBUG, TAG, "remoteEndpoint is null");
95 CAResult_t res = CA_STATUS_FAILED;
97 CASendDataType_t type = data->type;
99 if (type == SEND_TYPE_UNICAST)
101 coap_pdu_t* pdu = NULL;
103 if (data->requestInfo != NULL)
105 OIC_LOG_V(DEBUG, TAG, "requestInfo is available..");
107 pdu = (coap_pdu_t*) CAGeneratePdu(data->remoteEndpoint->resourceUri,
108 data->requestInfo->method, data->requestInfo->info);
110 else if (data->responseInfo != NULL)
112 OIC_LOG_V(DEBUG, TAG, "responseInfo is available..");
114 pdu = (coap_pdu_t*) CAGeneratePdu(data->remoteEndpoint->resourceUri,
115 data->responseInfo->result, data->responseInfo->info);
119 OIC_LOG(DEBUG, TAG, "request info, response info is empty");
122 // interface controller function call.
125 OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
127 OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
129 OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %s", pdu->hdr);
131 res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length);
135 else if (type == SEND_TYPE_MULTICAST)
137 OIC_LOG(DEBUG, TAG, "both requestInfo & responseInfo is not available");
139 coap_pdu_t* pdu = NULL;
141 memset(&info, 0, sizeof(CAInfo_t));
143 info.options = data->options;
144 info.numOptions = data->numOptions;
146 pdu = (coap_pdu_t*) CAGeneratePdu(data->remoteEndpoint->resourceUri, CA_GET, info);
150 OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data);
152 OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type);
154 OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code);
156 OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", pdu->hdr->id);
158 OIC_LOG_V(DEBUG, TAG, "PDU Maker - buffer data : %s", pdu->hdr);
160 res = CASendMulticastData(pdu->hdr, pdu->length);
165 OIC_LOG(DEBUG, TAG, "unknown type!");
170 static void CAReceivedPacketCallback(CARemoteEndpoint_t* endpoint, void* data, uint32_t dataLen)
172 OIC_LOG(DEBUG, TAG, "receivedPacketCallback in message handler!!");
176 OIC_LOG(DEBUG, TAG, "received data is null");
181 uint32_t code = CA_NOT_FOUND;
182 pdu = (coap_pdu_t*) CAParsePDU((const char*) data, &code);
185 char uri[CA_MAX_URI_LENGTH] =
188 if (code == CA_GET || code == CA_POST || code == CA_PUT || code == CA_DELETE)
190 CARequestInfo_t* ReqInfo;
191 ReqInfo = (CARequestInfo_t*) OICMalloc(sizeof(CARequestInfo_t));
192 memset(ReqInfo, 0, sizeof(CARequestInfo_t));
193 CAGetRequestInfoFromPdu(pdu, ReqInfo, uri);
195 if (NULL != ReqInfo->info.options && NULL != endpoint)
198 for (i = 0; i < ReqInfo->info.numOptions; i++)
200 OIC_LOG_V(DEBUG, TAG, "Request- optionID: %d", ReqInfo->info.options[i].optionID);
202 OIC_LOG_V(DEBUG, TAG, "Request- list: %s", ReqInfo->info.options[i].optionData);
205 OIC_LOG_V(DEBUG, TAG, "Request- payload: %s", ReqInfo->info.payload);
207 OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method);
209 endpoint->resourceUri = (char*) OICMalloc(strlen(uri) + 1);
210 memset(endpoint->resourceUri, 0, strlen(uri) + 1);
211 memcpy(endpoint->resourceUri, uri, strlen(uri));
212 OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
215 // store the data at queue.
216 CAData_t* cadata = NULL;
217 cadata = (CAData_t*) OICMalloc(sizeof(CAData_t));
218 memset(cadata, 0, sizeof(CAData_t));
220 cadata->type = SEND_TYPE_UNICAST;
221 cadata->remoteEndpoint = endpoint;
222 cadata->requestInfo = ReqInfo;
223 cadata->responseInfo = NULL;
224 CAQueueingThreadAddData(&gReceiveThread, cadata, sizeof(CAData_t));
228 CAResponseInfo_t* ResInfo;
229 ResInfo = (CAResponseInfo_t*) OICMalloc(sizeof(CAResponseInfo_t));
230 memset(ResInfo, 0, sizeof(CAResponseInfo_t));
231 CAGetResponseInfoFromPdu(pdu, ResInfo, uri);
233 if (NULL != ResInfo->info.options && NULL != endpoint)
236 for (i = 0; i < ResInfo->info.numOptions; i++)
238 OIC_LOG_V(DEBUG, TAG, "Response- optionID: %d", ResInfo->info.options[i].optionID);
240 OIC_LOG_V(DEBUG, TAG, "Response- list: %s", ResInfo->info.options[i].optionData);
243 OIC_LOG_V(DEBUG, TAG, "Response- payload: %s", ResInfo->info.payload);
245 OIC_LOG_V(DEBUG, TAG, "Response- code: %d", ResInfo->result);
247 endpoint->resourceUri = (char*) OICMalloc(strlen(uri) + 1);
248 memset(endpoint->resourceUri, 0, strlen(uri) + 1);
249 memcpy(endpoint->resourceUri, uri, strlen(uri));
250 OIC_LOG_V(DEBUG, TAG, "added resource URI : %s", endpoint->resourceUri);
253 // store the data at queue.
254 CAData_t* cadata = NULL;
255 cadata = (CAData_t*) OICMalloc(sizeof(CAData_t));
256 memset(cadata, 0, sizeof(CAData_t));
258 cadata->type = SEND_TYPE_UNICAST;
259 cadata->remoteEndpoint = endpoint;
260 cadata->requestInfo = NULL;
261 cadata->responseInfo = ResInfo;
263 CAQueueingThreadAddData(&gReceiveThread, cadata, sizeof(CAData_t));
267 static void CANetworkChangedCallback(CALocalConnectivity_t* info, CANetworkStatus_t status)
269 OIC_LOG(DEBUG, TAG, "networkChangeCallback in message handler!!");
272 void CAHandleRequestResponseCallbacks()
274 OIC_LOG_V(DEBUG, TAG, "CAHandleRequestResponseCallbacks");
276 // parse the data and call the callbacks.
280 u_mutex_lock(gReceiveThread.threadMutex);
282 u_queue_message_t* item = u_queue_get_element(gReceiveThread.dataQueue);
284 u_mutex_unlock(gReceiveThread.threadMutex);
290 void* msg = item->msg;
296 CAData_t* td = (CAData_t*) msg;
297 CARemoteEndpoint_t* rep = td->remoteEndpoint;
302 if (td->requestInfo != NULL)
306 gRequestHandler(rep, td->requestInfo);
309 if(NULL != td->requestInfo->info.options)
311 OICFree(td->requestInfo->info.options);
314 if(NULL != td->requestInfo->info.payload)
316 OICFree(td->requestInfo->info.payload);
319 if(NULL != td->requestInfo->info.token)
321 OICFree(td->requestInfo->info.token);
323 OICFree(td->requestInfo);
326 if (td->responseInfo != NULL)
328 if (gResponseHandler)
330 gResponseHandler(rep, td->responseInfo);
333 if(NULL != td->responseInfo->info.options)
335 OICFree(td->responseInfo->info.options);
338 if(NULL != td->responseInfo->info.payload)
340 OICFree(td->responseInfo->info.payload);
343 if(NULL != td->responseInfo->info.token)
345 OICFree(td->responseInfo->info.token);
347 OICFree(td->responseInfo);
350 if(NULL != rep->resourceUri)
352 OICFree(rep->resourceUri);
357 CAResult_t CADetachRequestMessage(const CARemoteEndpoint_t* object, const CARequestInfo_t* request)
359 OIC_LOG_V(DEBUG, TAG, "CADetachRequestMessage");
361 if (object == NULL || request == NULL)
363 return CA_STATUS_FAILED;
366 CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
367 MEMORY_ALLOCK_CHECK(data);
370 memset(data, 0, sizeof(CAData_t));
372 // clone remote endpoint
373 CARemoteEndpoint_t* remoteEndpoint = CACloneRemoteEndpoint(object);
374 MEMORY_ALLOCK_CHECK(remoteEndpoint);
376 // clone request info
377 CARequestInfo_t* requestInfo = CACloneRequestInfo(request);
378 MEMORY_ALLOCK_CHECK(requestInfo);
381 data->type = SEND_TYPE_UNICAST;
382 data->remoteEndpoint = remoteEndpoint;
383 data->requestInfo = requestInfo;
384 data->responseInfo = NULL;
387 CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
391 // memory error label.
394 CADestroyRemoteEndpointInternal(remoteEndpoint);
396 CADestroyRequestInfoInternal(requestInfo);
403 return CA_MEMORY_ALLOC_FAILED;
406 CAResult_t CADetachResponseMessage(const CARemoteEndpoint_t* object,
407 const CAResponseInfo_t* response)
409 OIC_LOG_V(DEBUG, TAG, "CADetachResponseMessage");
411 if (object == NULL || response == NULL)
413 return CA_STATUS_FAILED;
416 CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
417 MEMORY_ALLOCK_CHECK(data);
420 memset(data, 0, sizeof(CAData_t));
422 // clone remote endpoint
423 CARemoteEndpoint_t* remoteEndpoint = CACloneRemoteEndpoint(object);
424 MEMORY_ALLOCK_CHECK(remoteEndpoint);
426 // clone response info
427 CAResponseInfo_t* responseInfo = CACloneResponseInfo(response);
428 MEMORY_ALLOCK_CHECK(responseInfo);
431 data->type = SEND_TYPE_UNICAST;
432 data->remoteEndpoint = remoteEndpoint;
433 data->requestInfo = NULL;
434 data->responseInfo = responseInfo;
437 CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
441 // memory error label.
444 CADestroyRemoteEndpointInternal(remoteEndpoint);
446 CADestroyResponseInfoInternal(responseInfo);
453 return CA_MEMORY_ALLOC_FAILED;
456 CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAHeaderOption_t* options,
459 if (resourceUri == NULL)
461 return CA_STATUS_FAILED;
464 CAData_t* data = (CAData_t*) OICMalloc(sizeof(CAData_t));
465 MEMORY_ALLOCK_CHECK(data);
468 memset(data, 0, sizeof(CAData_t));
471 memset(&addr, 0, sizeof(CAAddress_t));
472 CARemoteEndpoint_t* remoteEndpoint = CACreateRemoteEndpointInternal(resourceUri, addr,
473 CA_ETHERNET | CA_WIFI | CA_EDR | CA_LE);
476 data->type = SEND_TYPE_MULTICAST;
477 data->remoteEndpoint = remoteEndpoint;
478 data->requestInfo = NULL;
479 data->responseInfo = NULL;
480 data->options = NULL;
481 data->numOptions = 0;
483 if (options != NULL && numOptions > 0)
486 CAHeaderOption_t* temp = (CAHeaderOption_t*) OICMalloc(
487 sizeof(CAHeaderOption_t) * numOptions);
489 MEMORY_ALLOCK_CHECK(temp);
491 memset(temp, 0, sizeof(CAHeaderOption_t) * numOptions);
492 memcpy(temp, options, sizeof(CAHeaderOption_t) * numOptions);
494 data->options = temp;
495 data->numOptions = numOptions;
499 CAQueueingThreadAddData(&gSendThread, data, sizeof(CAData_t));
503 // memory error label.
506 CADestroyRemoteEndpointInternal(remoteEndpoint);
513 return CA_MEMORY_ALLOC_FAILED;
516 void CASetRequestResponseCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler)
518 OIC_LOG_V(DEBUG, TAG, "set request, response handler callback.");
520 gRequestHandler = ReqHandler;
521 gResponseHandler = RespHandler;
524 CAResult_t CAInitializeMessageHandler()
526 OIC_LOG(DEBUG, TAG, "CAInitializeMessageHandler - Entry");
527 CASetPacketReceivedCallback(CAReceivedPacketCallback);
529 CASetNetworkChangeCallback(CANetworkChangedCallback);
531 // create thread pool
533 res = u_thread_pool_init(MAX_THREAD_POOL_SIZE, &gThreadPoolHandle);
535 if (res != CA_STATUS_OK)
537 OIC_LOG_V(DEBUG, TAG, "thread pool initialize error.");
541 // send thread initialize
542 CAQueueingThreadInitialize(&gSendThread, gThreadPoolHandle, CASendThreadProcess);
545 res = CAQueueingThreadStart(&gSendThread);
547 if (res != CA_STATUS_OK)
549 OIC_LOG_V(DEBUG, TAG, "thread start error(send thread).");
553 // receive thread initialize
554 CAQueueingThreadInitialize(&gReceiveThread, gThreadPoolHandle, CAReceiveThreadProcess);
556 // start receive thread
558 // currently not support.
559 // res = CAThreadStart(gReceiveThread);
561 // if (res != CA_STATUS_OK)
563 // OIC_LOG_V(DEBUG, TAG, "thread start error(receive thread).");
567 // initialize interface adapters by controller
568 CAInitializeAdapters(gThreadPoolHandle);
573 void CATerminateMessageHandler()
575 // terminate interface adapters by controller
576 CATerminateAdapters();
579 // delete thread data
580 CAQueueingThreadStop(&gSendThread);
581 CAQueueingThreadDestroy(&gSendThread);
584 // delete thread data
585 CAQueueingThreadStop(&gReceiveThread);
586 CAQueueingThreadDestroy(&gReceiveThread);
588 // destroy thread pool
589 u_thread_pool_free(gThreadPoolHandle);
591 OIC_LOG_V(DEBUG, TAG, "message handler terminate completed!");