From: jnashok Date: Mon, 4 May 2015 09:33:04 +0000 (+0900) Subject: Merge Single/Multi thread for camessagehandler X-Git-Tag: 1.2.0+RC1~1415 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=adbaa609ba807ee4e4ad9a94f94fd4bf05850b03;p=platform%2Fupstream%2Fiotivity.git Merge Single/Multi thread for camessagehandler Common code in Single thread and multithread merged into single file Specific code is seperated with SINGLETHREAD flag Change-Id: I6e9f3486c28fc188635fd86be1f01036636dedb3 Signed-off-by: jnashok Reviewed-on: https://gerrit.iotivity.org/gerrit/892 Tested-by: jenkins-iotivity Reviewed-by: Erich Keane --- diff --git a/resource/csdk/connectivity/common/inc/caremotehandler.h b/resource/csdk/connectivity/common/inc/caremotehandler.h index c578a89..e24db97 100644 --- a/resource/csdk/connectivity/common/inc/caremotehandler.h +++ b/resource/csdk/connectivity/common/inc/caremotehandler.h @@ -86,6 +86,13 @@ CAResponseInfo_t *CACloneResponseInfo(const CAResponseInfo_t *response); */ void CADestroyResponseInfoInternal(CAResponseInfo_t *response); +/** + * @brief Free the error information + * @param errorInfo [IN] error information to be freed + * @return + */ +void CADestroyErrorInfoInternal(CAErrorInfo_t *errorInfo); + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/resource/csdk/connectivity/common/src/caremotehandler.c b/resource/csdk/connectivity/common/src/caremotehandler.c index 49eb444..c9b7f5e 100644 --- a/resource/csdk/connectivity/common/src/caremotehandler.c +++ b/resource/csdk/connectivity/common/src/caremotehandler.c @@ -307,26 +307,30 @@ void CAFreeEndpoint(CAEndpoint_t *rep) OICFree(rep); } -void CADestroyRequestInfoInternal(CARequestInfo_t *rep) +static void CADestroyInfoInternal(CAInfo_t *info) { - if (NULL == rep) - { - OIC_LOG(ERROR, TAG, "parameter is null"); - return; - } - // free token field - OICFree(rep->info.token); + OICFree(info->token); // free options field - OICFree((CAHeaderOption_t *) rep->info.options); + OICFree(info->options); // free payload field - OICFree((char *) rep->info.payload); + OICFree((char *) info->payload); // free uri - OICFree(rep->info.resourceUri); + OICFree(info->resourceUri); +} + +void CADestroyRequestInfoInternal(CARequestInfo_t *rep) +{ + if (NULL == rep) + { + OIC_LOG(ERROR, TAG, "parameter is null"); + return; + } + CADestroyInfoInternal(&rep->info); OICFree(rep); } @@ -338,21 +342,19 @@ void CADestroyResponseInfoInternal(CAResponseInfo_t *rep) return; } - // free token field - OICFree(rep->info.token); + CADestroyInfoInternal(&rep->info); + OICFree(rep); +} - // free options field - if (rep->info.options != NULL && rep->info.numOptions) +void CADestroyErrorInfoInternal(CAErrorInfo_t *errorInfo) +{ + if (NULL == errorInfo) { - OICFree((CAHeaderOption_t *) rep->info.options); + OIC_LOG(ERROR, TAG, "parameter is null"); + return; } - // free payload field - OICFree((char *) rep->info.payload); - - // free uri - OICFree(rep->info.resourceUri); - - OICFree(rep); + CADestroyInfoInternal(&errorInfo->info); + OICFree(errorInfo); } diff --git a/resource/csdk/connectivity/inc/camessagehandler.h b/resource/csdk/connectivity/inc/camessagehandler.h index 9e58735..4ffc362 100644 --- a/resource/csdk/connectivity/inc/camessagehandler.h +++ b/resource/csdk/connectivity/inc/camessagehandler.h @@ -52,6 +52,31 @@ #define CA_MEMORY_ALLOC_CHECK(arg) { if (NULL == arg) {OIC_LOG(ERROR, TAG, "Out of memory"); \ goto memory_error_exit;} } +typedef enum +{ + SEND_TYPE_MULTICAST = 0, + SEND_TYPE_UNICAST +} CASendDataType_t; + +typedef enum +{ + CA_REQUEST_DATA = 1, + CA_RESPONSE_DATA = 2, + CA_ERROR_DATA = 3, +} CADataType_t; + +typedef struct +{ + CASendDataType_t type; + CAEndpoint_t *remoteEndpoint; + CARequestInfo_t *requestInfo; + CAResponseInfo_t *responseInfo; + CAErrorInfo_t *errorInfo; + CAHeaderOption_t *options; + CADataType_t dataType; + uint8_t numOptions; +} CAData_t; + #ifdef __cplusplus extern "C" { diff --git a/resource/csdk/connectivity/inc/camessagehandler_singlethread.h b/resource/csdk/connectivity/inc/camessagehandler_singlethread.h deleted file mode 100644 index 10f2416..0000000 --- a/resource/csdk/connectivity/inc/camessagehandler_singlethread.h +++ /dev/null @@ -1,139 +0,0 @@ -/****************************************************************** - * - * Copyright 2014 Samsung Electronics All Rights Reserved. - * - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - ******************************************************************/ - -/** - * @file camessagehandler_singlethread.h - * @brief This file contains message functionality. - */ - -#ifndef CA_MESSAGE_HANDLER_SINGLETHREAD_H_ -#define CA_MESSAGE_HANDLER_SINGLETHREAD_H_ - -#include "cacommon.h" -#include "coap.h" - -/** - * @def VERIFY_NON_NULL - * @brief Macro to verify the validity of input argument. - */ -#define VERIFY_NON_NULL(arg, log_tag, log_message) \ - if (NULL == arg ){ \ - OIC_LOG_V(ERROR, log_tag, "Invalid input:%s", log_message); \ - return CA_STATUS_INVALID_PARAM; \ - } \ - -/** - * @def VERIFY_NON_NULL_VOID - * @brief Macro to verify the validity of input argument. - */ -#define VERIFY_NON_NULL_VOID(arg, log_tag, log_message) \ - if (NULL == arg ){ \ - OIC_LOG_V(ERROR, log_tag, "Invalid input:%s", log_message); \ - return; \ - } \ - -#define CA_MEMORY_ALLOC_CHECK(arg) { if (NULL == arg) {OIC_LOG(ERROR, TAG, "Out of memory"); \ -goto memory_error_exit;} } - -#ifdef __cplusplus -extern "C" -{ -#endif - -/** - * @brief Detaches control from the caller for sending unicast request - * @param endpoint [IN] endpoint information where the data has to be sent - * @param request [IN] request that needs to be sent - * @return CA_STATUS_OK or ERROR CODES (CAResult_t error codes in cacommon.h) - */ -CAResult_t CADetachRequestMessage(const CAEndpoint_t *endpoint, - const CARequestInfo_t *request); - -/** - * @brief Detaches control from the caller for sending multicast request - * @param object [IN] Group endpoint information where the data has to be sent - * @param request [IN] request that needs to be sent - * @return CA_STATUS_OK or ERROR CODES (CAResult_t error codes in cacommon.h) - */ -CAResult_t CADetachRequestToAllMessage(const CAEndpoint_t *object, - const CARequestInfo_t *request); - -/** - * @brief Detaches control from the caller for sending response - * @param endpoint [IN] endpoint information where the data has to be sent - * @param response [IN] response that needs to be sent - * @return CA_STATUS_OK or ERROR CODES (CAResult_t error codes in cacommon.h) - */ -CAResult_t CADetachResponseMessage(const CAEndpoint_t *endpoint, - const CAResponseInfo_t *response); - -/** - * @brief Detaches control from the caller for sending request - * @param resourceUri [IN] resource uri that needs to be sent in the request - * @param token [IN] token information of the request - * @param tokenLength [IN] length of the token - * @param options [IN] header options that need to be append in the request - * @param numOptions [IN] number of options be appended - * @return CA_STATUS_OK or ERROR CODES (CAResult_t error codes in cacommon.h) - */ -CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token, - uint8_t tokenLength, const CAHeaderOption_t *options, - uint8_t numOptions); - -/** - * @brief Setting the request and response callbacks for network packets - * @param ReqHandler [IN] callback for receiving the requests - * @param RespHandler [IN] callback for receiving the response - * @param ErrorHandler [IN] callback for receiving error response - * @return NONE - */ -void CASetInterfaceCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler, - CAErrorCallback ErrorHandler); - -/** - * @brief Initialize the message handler by starting thread pool and initializing the - * send and receive queue - * @return CA_STATUS_OK or ERROR CODES (CAResult_t error codes in cacommon.h) - */ -CAResult_t CAInitializeMessageHandler(); - -/** - * @brief Terminate the message handler by stopping the thread pool and destroying the queues - * @return NONE - */ -void CATerminateMessageHandler(); - -/** - * @brief Handler for receiving request and response callback in single thread model - */ -void CAHandleRequestResponseCallbacks(); - -/** - * @brief To log the PDU data - * @param pdu [IN] pdu data - */ -void CALogPDUInfo(coap_pdu_t *pdu); - -#ifdef __cplusplus -} /* extern "C" */ -#endif - -#endif /* CA_MESSAGE_HANDLER_SINGLETHREAD_H_ */ - diff --git a/resource/csdk/connectivity/src/SConscript b/resource/csdk/connectivity/src/SConscript index 7a91274..7b6ba7d 100755 --- a/resource/csdk/connectivity/src/SConscript +++ b/resource/csdk/connectivity/src/SConscript @@ -62,7 +62,7 @@ if ca_os == 'arduino': ca_common_src = [ 'caconnectivitymanager.c', 'cainterfacecontroller.c', - 'camessagehandler_singlethread.c', + 'camessagehandler.c', 'canetworkconfigurator.c', 'caprotocolmessage.c', 'caretransmission.c', diff --git a/resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c b/resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c index 8135659..a0570ea 100644 --- a/resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c +++ b/resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c @@ -739,8 +739,13 @@ void CAAdapterDataReceiverHandler(void *context) { OIC_LOG(DEBUG, EDR_ADAPTER_TAG, "Sending data up !"); g_networkPacketReceivedCallback(remoteEndpoint, defragData, recvDataLen); + + OICFree(defragData); + CAFreeEndpoint(remoteEndpoint); + recvDataLen = 0; totalDataLen = 0; + defragData = NULL; remoteEndpoint = NULL; g_isHeaderAvailable = false; } diff --git a/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c b/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c index afd362e..98ed564 100644 --- a/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c +++ b/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c @@ -587,6 +587,7 @@ void CALEServerDataReceiverHandler(void *threadData) if (!bleData) { OIC_LOG(DEBUG, CALEADAPTER_TAG, "Invalid bleData!"); + ca_mutex_unlock(g_bleServerReceiveDataMutex); return; } @@ -604,6 +605,7 @@ void CALEServerDataReceiverHandler(void *threadData) if (NULL == defragData) { OIC_LOG(ERROR, CALEADAPTER_TAG, "defragData is NULL!"); + ca_mutex_unlock(g_bleServerReceiveDataMutex); return; } @@ -631,11 +633,20 @@ void CALEServerDataReceiverHandler(void *threadData) if (NULL == g_networkPacketReceivedCallback) { OIC_LOG(ERROR, CALEADAPTER_TAG, "gReqRespCallback is NULL!"); + OICFree(defragData); + CAFreeEndpoint(remoteEndpoint); + remoteEndpoint = NULL; + defragData = NULL; ca_mutex_unlock(g_bleAdapterReqRespCbMutex); + ca_mutex_unlock(g_bleServerReceiveDataMutex); return; } OIC_LOG(DEBUG, CALEADAPTER_TAG, "Sending data up !"); g_networkPacketReceivedCallback(remoteEndpoint, defragData, recvDataLen); + + OICFree(defragData); + CAFreeEndpoint(remoteEndpoint); + recvDataLen = 0; totalDataLen = 0; isHeaderAvailable = false; @@ -652,6 +663,8 @@ void CALEServerDataReceiverHandler(void *threadData) isHeaderAvailable = false; OICFree(defragData); CAFreeEndpoint(remoteEndpoint); + remoteEndpoint = NULL; + defragData = NULL; ca_mutex_unlock(g_bleServerReceiveDataMutex); return; } @@ -681,6 +694,7 @@ void CALEClientDataReceiverHandler(void *threadData) if (!bleData) { OIC_LOG(DEBUG, CALEADAPTER_TAG, "Invalid wifidata!"); + ca_mutex_unlock(g_bleClientReceiveDataMutex); return; } @@ -700,6 +714,7 @@ void CALEClientDataReceiverHandler(void *threadData) if (NULL == defragData) { OIC_LOG(ERROR, CALEADAPTER_TAG, "defragData is NULL!"); + ca_mutex_unlock(g_bleClientReceiveDataMutex); return; } @@ -727,7 +742,12 @@ void CALEClientDataReceiverHandler(void *threadData) if (NULL == g_networkPacketReceivedCallback) { OIC_LOG(ERROR, CALEADAPTER_TAG, "gReqRespCallback is NULL!"); + OICFree(defragData); + CAFreeEndpoint(remoteEndpoint); + remoteEndpoint = NULL; + defragData = NULL; ca_mutex_unlock(g_bleAdapterReqRespCbMutex); + ca_mutex_unlock(g_bleClientReceiveDataMutex); return; } OIC_LOG(DEBUG, CALEADAPTER_TAG, "Sending data up !"); @@ -735,6 +755,8 @@ void CALEClientDataReceiverHandler(void *threadData) recvDataLen = 0; totalDataLen = 0; isHeaderAvailable = false; + OICFree(defragData); + CAFreeEndpoint(remoteEndpoint); remoteEndpoint = NULL; defragData = NULL; ca_mutex_unlock(g_bleAdapterReqRespCbMutex); @@ -745,6 +767,8 @@ void CALEClientDataReceiverHandler(void *threadData) OIC_LOG(DEBUG, CALEADAPTER_TAG, "GATTClient is terminating. Cleaning up"); OICFree(defragData); CAFreeEndpoint(remoteEndpoint); + remoteEndpoint = NULL; + defragData = NULL; ca_mutex_unlock(g_bleClientReceiveDataMutex); return; } diff --git a/resource/csdk/connectivity/src/cainterfacecontroller.c b/resource/csdk/connectivity/src/cainterfacecontroller.c index 6c4a81f..3441659 100644 --- a/resource/csdk/connectivity/src/cainterfacecontroller.c +++ b/resource/csdk/connectivity/src/cainterfacecontroller.c @@ -134,8 +134,6 @@ static void CAReceivedPacketCallback(const CAEndpoint_t *endpoint, void *data, u } else { - OICFree(data); - OIC_LOG(ERROR, TAG, "network packet received callback is NULL!"); } diff --git a/resource/csdk/connectivity/src/camessagehandler.c b/resource/csdk/connectivity/src/camessagehandler.c index 9695e2f..d92d5b1 100644 --- a/resource/csdk/connectivity/src/camessagehandler.c +++ b/resource/csdk/connectivity/src/camessagehandler.c @@ -26,49 +26,23 @@ #include "cainterface.h" #include "camessagehandler.h" #include "caremotehandler.h" -#include "cainterfacecontroller.h" #include "caprotocolmessage.h" -#include "caretransmission.h" -#include "caadapterutils.h" -#include "uqueue.h" #include "logger.h" #include "config.h" /* for coap protocol */ -#include "cathreadpool.h" /* for thread pool */ -#include "caqueueingthread.h" -#include "camutex.h" #include "oic_malloc.h" -#include "oic_string.h" #include "canetworkconfigurator.h" +#include "caadapterutils.h" +#include "cainterfacecontroller.h" +#include "caretransmission.h" -#define TAG PCF("CA_MSG_HNDLR") -#define SINGLE_HANDLE +#ifndef SINGLE_THREAD +#include "uqueue.h" +#include "cathreadpool.h" /* for thread pool */ +#include "caqueueingthread.h" +#define SINGLE_HANDLE #define MAX_THREAD_POOL_SIZE 20 -typedef enum -{ - SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST -} CASendDataType_t; - -typedef enum -{ - CA_REQUEST_DATA = 1, - CA_RESPONSE_DATA = 2, - CA_ERROR_DATA = 3, -} CADataType_t; - -typedef struct -{ - CASendDataType_t type; - CAEndpoint_t *remoteEndpoint; - CARequestInfo_t *requestInfo; - CAResponseInfo_t *responseInfo; - CAErrorInfo_t *errorInfo; - CAHeaderOption_t *options; - CADataType_t dataType; - uint8_t numOptions; -} CAData_t; - // thread pool handle static ca_thread_pool_t g_threadPoolHandle = NULL; @@ -76,6 +50,12 @@ static ca_thread_pool_t g_threadPoolHandle = NULL; static CAQueueingThread_t g_sendThread; static CAQueueingThread_t g_receiveThread; +#else +#define CA_MAX_RT_ARRAY_SIZE 3 +#endif /* SINGLE_THREAD */ + +#define TAG "CA_MSG_HNDLR" + static CARetransmission_t g_retransmissionContext; // handler field @@ -87,6 +67,14 @@ static void CAErrorHandler(const CAEndpoint_t *endpoint, const void *data, uint32_t dataLen, CAResult_t result); +static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint, const void *data, + CADataType_t dataType); + +static void CAProcessReceivedData(CAData_t *data); +static void CADataDestroyer(void *data, uint32_t size); +static void CALogPayloadInfo(CAInfo_t *info); +static bool CADropSecondRequest(const CAEndpoint_t *endpoint, uint16_t messageId); + static bool CAIsSelectedNetworkAvailable() { u_arraylist_t *list = CAGetSelectedNetworkList(); @@ -99,6 +87,123 @@ static bool CAIsSelectedNetworkAvailable() return true; } +static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint, const void *data, CADataType_t dataType) +{ + OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData IN"); + CAInfo_t *info = NULL; + CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); + if (!cadata) + { + OIC_LOG(ERROR, TAG, "memory allocation failed"); + return NULL; + } + + CAEndpoint_t* ep = CACloneEndpoint(endpoint); + if (!ep) + { + OIC_LOG(ERROR, TAG, "endpoint clone failed"); + OICFree(cadata); + return NULL; + } + + OIC_LOG_V(DEBUG, TAG, "address : %s", ep->addr); + CAResult_t result; + + if(CA_RESPONSE_DATA == dataType) + { + CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t)); + if (!resInfo) + { + OIC_LOG(ERROR, TAG, "memory allocation failed"); + OICFree(cadata); + CAFreeEndpoint(ep); + return NULL; + } + + result = CAGetResponseInfoFromPDU(data, resInfo); + if (CA_STATUS_OK != result) + { + OIC_LOG(ERROR, TAG, "CAGetResponseInfoFromPDU Failed"); + CAFreeEndpoint(ep); + CADestroyResponseInfoInternal(resInfo); + OICFree(cadata); + return NULL; + } + cadata->responseInfo = resInfo; + info = &resInfo->info; + OIC_LOG(DEBUG, TAG, "Response Info :"); + CALogPayloadInfo(info); + } + else if (CA_REQUEST_DATA == dataType) + { + CARequestInfo_t* reqInfo = (CARequestInfo_t*)OICCalloc(1, sizeof(CARequestInfo_t)); + if (!reqInfo) + { + OIC_LOG(ERROR, TAG, "memory allocation failed"); + OICFree(cadata); + CAFreeEndpoint(ep); + return NULL; + } + + result = CAGetRequestInfoFromPDU(data, reqInfo); + if (CA_STATUS_OK != result) + { + OIC_LOG(ERROR, TAG, "CAGetRequestInfoFromPDU failed"); + CAFreeEndpoint(ep); + CADestroyRequestInfoInternal(reqInfo); + OICFree(cadata); + return NULL; + } + + if (CADropSecondRequest(endpoint, reqInfo->info.messageId)) + { + OIC_LOG(ERROR, TAG, "Second Request with same Token, Drop it"); + CAFreeEndpoint(ep); + CADestroyRequestInfoInternal(reqInfo); + OICFree(cadata); + return NULL; + } + + cadata->requestInfo = reqInfo; + info = &reqInfo->info; + OIC_LOG(DEBUG, TAG, "Request Info :"); + CALogPayloadInfo(info); + } + else if (CA_ERROR_DATA == dataType) + { + CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t)); + if (!errorInfo) + { + OIC_LOG(ERROR, TAG, "Memory allocation failed!"); + OICFree(cadata); + CAFreeEndpoint(ep); + return NULL; + } + + CAResult_t result = CAGetErrorInfoFromPDU(data, errorInfo); + if (CA_STATUS_OK != result) + { + OIC_LOG(ERROR, TAG, "CAGetErrorInfoFromPDU failed"); + CAFreeEndpoint(ep); + OICFree(errorInfo); + OICFree(cadata); + return NULL; + } + + cadata->errorInfo = errorInfo; + info = &errorInfo->info; + OIC_LOG(DEBUG, TAG, "error Info :"); + CALogPayloadInfo(info); + } + + cadata->remoteEndpoint = ep; + cadata->dataType = dataType; + + return cadata; + + OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData OUT"); +} + static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uint32_t size) { OIC_LOG(DEBUG, TAG, "IN"); @@ -106,7 +211,7 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin VERIFY_NON_NULL_VOID(pdu, TAG, "pdu"); CAEndpoint_t* ep = CACloneEndpoint(endpoint); - if (NULL == ep) + if (!ep) { OIC_LOG(ERROR, TAG, "clone failed"); return; @@ -114,7 +219,7 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t)); - if (NULL == resInfo) + if (!resInfo) { OIC_LOG(ERROR, TAG, "calloc failed"); CAFreeEndpoint(ep); @@ -124,12 +229,12 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin resInfo->result = CA_RETRANSMIT_TIMEOUT; resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size); resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size); + CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *) pdu, &(resInfo->info)); if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list"); - OICFree(resInfo->info.token); - OICFree(resInfo); + CADestroyResponseInfoInternal(resInfo); CAFreeEndpoint(ep); return; } @@ -139,7 +244,7 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin { OIC_LOG(ERROR, TAG, "memory allocation failed !"); CAFreeEndpoint(ep); - OICFree(resInfo); + CADestroyResponseInfoInternal(resInfo); return; } @@ -148,13 +253,18 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin cadata->requestInfo = NULL; cadata->responseInfo = resInfo; +#ifdef SINGLE_THREAD + CAProcessReceivedData(cadata); +#else CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); +#endif + OIC_LOG(DEBUG, TAG, "OUT"); } static void CADataDestroyer(void *data, uint32_t size) { - OIC_LOG(DEBUG, TAG, "IN"); + OIC_LOG(DEBUG, TAG, "CADataDestroyer IN"); CAData_t *cadata = (CAData_t *) data; if (NULL == cadata) @@ -180,27 +290,18 @@ static void CADataDestroyer(void *data, uint32_t size) if (NULL != cadata->errorInfo) { - CAInfo_t *info = &cadata->errorInfo->info; - OICFree(info->token); - OICFree(info->options); - OICFree(info->payload); - OICFree(info->resourceUri); - OICFree(cadata->errorInfo); + CADestroyErrorInfoInternal(cadata->errorInfo); } + OICFree(cadata->options); OICFree(cadata); - OIC_LOG(DEBUG, TAG, "OUT"); + OIC_LOG(DEBUG, TAG, "CADataDestroyer OUT"); } -static void CAReceiveThreadProcess(void *threadData) +static void CAProcessReceivedData(CAData_t *data) { - OIC_LOG(DEBUG, TAG, "IN"); - // Currently not supported - // This will be enabled when RI supports multi threading -#ifndef SINGLE_HANDLE - CAData_t *data = (CAData_t *) threadData; - - if (NULL == data) + OIC_LOG(DEBUG, TAG, "CAProcessReceivedData IN"); + if (!data) { OIC_LOG(ERROR, TAG, "thread data error!!"); return; @@ -210,8 +311,7 @@ static void CAReceiveThreadProcess(void *threadData) // #1 parse the data // #2 get endpoint CAEndpoint_t *rep = (CAEndpoint_t *)(data->remoteEndpoint); - - if (NULL == rep) + if (!rep) { OIC_LOG(ERROR, TAG, "remoteEndpoint error!!"); return; @@ -230,15 +330,30 @@ static void CAReceiveThreadProcess(void *threadData) g_errorHandler(rep, data->errorInfo); } -#endif /* SINGLE_HANDLE */ - OIC_LOG(DEBUG, TAG, "OUT"); + +#ifdef SINGLE_THREAD + CADataDestroyer(data, sizeof(CAData_t)); +#endif + + OIC_LOG(DEBUG, TAG, "CAProcessReceivedData OUT"); } -static void CASendThreadProcess(void *threadData) +#ifndef SINGLE_THREAD + +static void CAReceiveThreadProcess(void *threadData) { OIC_LOG(DEBUG, TAG, "IN"); +#ifndef SINGLE_HANDLE CAData_t *data = (CAData_t *) threadData; + CAProcessReceivedData(data); +#endif + OIC_LOG(DEBUG, TAG, "OUT"); +} +#endif +static void CAProcessSendData(const CAData_t *data) +{ + OIC_LOG(DEBUG, TAG, "IN"); VERIFY_NON_NULL_VOID(data, TAG, "data"); VERIFY_NON_NULL_VOID(data->remoteEndpoint, TAG, "remoteEndpoint"); @@ -287,7 +402,7 @@ static void CASendThreadProcess(void *threadData) pdu->length); if (CA_STATUS_OK != res) { - OIC_LOG_V(INFO, TAG, "retransmission will be not working: %d", res); + OIC_LOG_V(INFO, TAG, "retransmission is not enabled due to error, res : %d", res); coap_delete_pdu(pdu); return; } @@ -296,7 +411,7 @@ static void CASendThreadProcess(void *threadData) } else { - OIC_LOG_V(ERROR,TAG,"Failed to generate unicast PDU"); + OIC_LOG(ERROR,TAG,"Failed to generate unicast PDU"); return; } } @@ -308,9 +423,6 @@ static void CASendThreadProcess(void *threadData) OIC_LOG(DEBUG, TAG, "requestInfo is available.."); CAInfo_t *info = &data->requestInfo->info; - info->options = data->options; - info->numOptions = data->numOptions; - pdu = CAGeneratePDU(CA_GET, info); if (NULL != pdu) { @@ -328,18 +440,27 @@ static void CASendThreadProcess(void *threadData) } else { - OIC_LOG_V(ERROR,TAG,"Failed to generate multicast PDU"); + OIC_LOG(ERROR,TAG,"Failed to generate multicast PDU"); } } else { - OIC_LOG_V(ERROR, TAG, "request info is empty"); + OIC_LOG(ERROR, TAG, "request info is empty"); } } OIC_LOG(DEBUG, TAG, "OUT"); } +#ifndef SINGLE_THREAD +static void CASendThreadProcess(void *threadData) +{ + CAData_t *data = (CAData_t *) threadData; + CAProcessSendData(data); +} + +#endif + /* * If a second message arrives with the same token and the other address * family, drop it. Typically, IPv6 beats IPv4, so the IPv4 message is dropped. @@ -366,11 +487,11 @@ static bool CADropSecondRequest(const CAEndpoint_t *endpoint, uint16_t messageId { if (familyFlags & CA_IPV6) { - OIC_LOG(INFO, TAG, PCF("IPv6 duplicate response ignored")); + OIC_LOG(INFO, TAG, "IPv6 duplicate response ignored"); } else { - OIC_LOG(INFO, TAG, PCF("IPv4 duplicate response ignored")); + OIC_LOG(INFO, TAG, "IPv4 duplicate response ignored"); } ret = true; } @@ -380,169 +501,74 @@ static bool CADropSecondRequest(const CAEndpoint_t *endpoint, uint16_t messageId return ret; } -static void CAReceivedPacketCallback(const CAEndpoint_t *endpoint, void *data, uint32_t dataLen) +static void CAReceivedPacketCallback(const CAEndpoint_t *remoteEndpoint, void *data, uint32_t dataLen) { OIC_LOG(DEBUG, TAG, "IN"); - VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint"); + VERIFY_NON_NULL_VOID(remoteEndpoint, TAG, "remoteEndpoint"); VERIFY_NON_NULL_VOID(data, TAG, "data"); uint32_t code = CA_NOT_FOUND; - coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code); - OICFree(data); + CAData_t *cadata = NULL; + coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code); if (NULL == pdu) { OIC_LOG(ERROR, TAG, "Parse PDU failed"); return; } + OIC_LOG_V(DEBUG, TAG, "code = %d", code); if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code) { - CARequestInfo_t *ReqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t)); - if (NULL == ReqInfo) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!"); - coap_delete_pdu(pdu); - return; - } - - CAResult_t res = CAGetRequestInfoFromPDU(pdu, ReqInfo); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "CAGetRequestInfoFromPDU failed : %d", res); - OICFree(ReqInfo); - coap_delete_pdu(pdu); - return; - } - - if (CADropSecondRequest(endpoint, ReqInfo->info.messageId)) + cadata = CAGenerateHandlerData(remoteEndpoint, pdu, CA_REQUEST_DATA); + if (!cadata) { - OICFree(ReqInfo); + OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!"); coap_delete_pdu(pdu); return; } - - if (NULL != ReqInfo->info.options) - { - uint32_t i; - for (i = 0; i < ReqInfo->info.numOptions; i++) - { - OIC_LOG_V(DEBUG, TAG, "Request- optionID: %d", ReqInfo->info.options[i].optionID); - - OIC_LOG_V(DEBUG, TAG, "Request- list: %s", ReqInfo->info.options[i].optionData); - } - } - - OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method); - if (NULL != ReqInfo->info.token) - { - OIC_LOG(DEBUG, TAG, "Request- token:"); - OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) ReqInfo->info.token, - ReqInfo->info.tokenLength); - } - - OIC_LOG_V(DEBUG, TAG, "Request- msgID : %d", ReqInfo->info.messageId); - // store the data at queue. - CAData_t *cadata = NULL; - cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - if (NULL == cadata) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !"); - CADestroyRequestInfoInternal(ReqInfo); - coap_delete_pdu(pdu); - return; - } - - cadata->type = SEND_TYPE_UNICAST; - cadata->remoteEndpoint = CACloneEndpoint(endpoint); - cadata->requestInfo = ReqInfo; - cadata->responseInfo = NULL; - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); } else { - CAResponseInfo_t *ResInfo = (CAResponseInfo_t *) OICCalloc(1, sizeof(CAResponseInfo_t)); - if (NULL == ResInfo) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!"); - coap_delete_pdu(pdu); - return; - } - - CAResult_t res = CAGetResponseInfoFromPDU(pdu, ResInfo); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "CAGetResponseInfoFromPDU failed : %d", res); - OICFree(ResInfo); - coap_delete_pdu(pdu); - return; - } - - if (NULL != ResInfo->info.options) - { - uint32_t i; - for (i = 0; i < ResInfo->info.numOptions; i++) - { - OIC_LOG_V(DEBUG, TAG, "Response- optionID: %d", ResInfo->info.options[i].optionID); - - OIC_LOG_V(DEBUG, TAG, "Response- list: %s", ResInfo->info.options[i].optionData); - } - } - - if (NULL != ResInfo->info.payload) - { - OIC_LOG_V(DEBUG, TAG, "Response- payload: %p(%u) from %s", ResInfo->info.payload, - ResInfo->info.payloadSize, endpoint->addr); - } - OIC_LOG_V(DEBUG, TAG, "Response- code: %d", ResInfo->result); - if (NULL != ResInfo->info.token) + cadata = CAGenerateHandlerData(remoteEndpoint, pdu, CA_RESPONSE_DATA); + if (!cadata) { - OIC_LOG(DEBUG, TAG, "Response- token:"); - OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) ResInfo->info.token, - ResInfo->info.tokenLength); - } - OIC_LOG_V(DEBUG, TAG, "Response- msgID: %d", ResInfo->info.messageId); - - // store the data at queue. - CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - if (NULL == cadata) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !"); - CADestroyResponseInfoInternal(ResInfo); + OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!"); coap_delete_pdu(pdu); return; } - cadata->type = SEND_TYPE_UNICAST; - cadata->remoteEndpoint = CACloneEndpoint(endpoint); - cadata->requestInfo = NULL; - // for retransmission void *retransmissionPdu = NULL; - CARetransmissionReceivedData(&g_retransmissionContext, endpoint, pdu->hdr, pdu->length, - &retransmissionPdu); + CARetransmissionReceivedData(&g_retransmissionContext, cadata->remoteEndpoint, pdu->hdr, + pdu->length, &retransmissionPdu); // get token from saved data in retransmission list if (retransmissionPdu && CA_EMPTY == code) { + CAInfo_t *info = &cadata->responseInfo->info; CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu, - &(ResInfo->info)); + info); if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list"); - OICFree(ResInfo->info.token); + OICFree(info->token); + info->tokenLength = 0; } } OICFree(retransmissionPdu); - cadata->responseInfo = ResInfo; - - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); } - if (pdu) - { - coap_delete_pdu(pdu); - } + cadata->type = SEND_TYPE_UNICAST; + +#ifdef SINGLE_THREAD + CAProcessReceivedData(cadata); +#else + CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); +#endif + + coap_delete_pdu(pdu); + OIC_LOG(DEBUG, TAG, "OUT"); } @@ -555,7 +581,11 @@ static void CANetworkChangedCallback(const CAEndpoint_t *info, CANetworkStatus_t void CAHandleRequestResponseCallbacks() { - + OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks IN"); +#ifdef SINGLE_THREAD + CAReadData(); + CARetransmissionBaseRoutine((void *)&g_retransmissionContext); +#else #ifdef SINGLE_HANDLE // parse the data and call the callbacks. // #1 parse the data @@ -602,133 +632,156 @@ void CAHandleRequestResponseCallbacks() CADataDestroyer(msg, sizeof(CAData_t)); OICFree(item); +#endif /* SINGLE_HANDLE */ #endif OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks OUT"); } -CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request) +static CAData_t* CAPrepareSendData(const CAEndpoint_t *endpoint, const void *sendData, + CADataType_t dataType) { - OIC_LOG(DEBUG, TAG, "IN"); + OIC_LOG(DEBUG, TAG, "CAPrepareSendData IN"); + CAInfo_t *info = NULL; - VERIFY_NON_NULL(object, TAG, "object"); - VERIFY_NON_NULL(request, TAG, "request"); - - if (false == CAIsSelectedNetworkAvailable()) + CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); + if (!cadata) { - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "memory allocation failed"); + return NULL; } - CAEndpoint_t *remoteEndpoint = NULL; - CARequestInfo_t *requestInfo = NULL; - CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - CA_MEMORY_ALLOC_CHECK(data); + if(CA_REQUEST_DATA == dataType) + { + // clone request info + CARequestInfo_t *request = CACloneRequestInfo((CARequestInfo_t *)sendData); + + if(!request) + { + OIC_LOG(ERROR, TAG, "CACloneRequestInfo failed"); + OICFree(cadata); + return NULL; + } - // clone remote endpoint - remoteEndpoint = CACloneEndpoint(object); - CA_MEMORY_ALLOC_CHECK(remoteEndpoint); + cadata->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST; + info = &request->info; + cadata->requestInfo = request; + } + else if(CA_RESPONSE_DATA == dataType) + { + // clone response info + CAResponseInfo_t *response = CACloneResponseInfo((CAResponseInfo_t *)sendData); - // clone request info - requestInfo = CACloneRequestInfo(request); - CA_MEMORY_ALLOC_CHECK(requestInfo); + if(!response) + { + OIC_LOG(ERROR, TAG, "CACloneResponseInfo failed"); + OICFree(cadata); + return NULL; + } + + cadata->type = SEND_TYPE_UNICAST; + info = &response->info; + cadata->responseInfo = response; + } - // save data - data->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST; - data->remoteEndpoint = remoteEndpoint; - data->requestInfo = requestInfo; - data->responseInfo = NULL; - data->options = NULL; - data->numOptions = 0; - if (NULL != requestInfo->info.options && 0 < requestInfo->info.numOptions) + if (NULL != info->options && 0 < info->numOptions) { - uint8_t numOptions = requestInfo->info.numOptions; + uint8_t numOptions = info->numOptions; // copy data CAHeaderOption_t *headerOption = (CAHeaderOption_t *) OICMalloc(sizeof(CAHeaderOption_t) * numOptions); - CA_MEMORY_ALLOC_CHECK(headerOption); + if(!headerOption) + { + OIC_LOG(ERROR, TAG, "memory allocation failed"); + CADataDestroyer(cadata, sizeof(CAData_t)); + return NULL; + } - memcpy(headerOption, requestInfo->info.options, sizeof(CAHeaderOption_t) * numOptions); + memcpy(headerOption, info->options, sizeof(CAHeaderOption_t) * numOptions); - data->options = headerOption; - data->numOptions = numOptions; + cadata->options = headerOption; + cadata->numOptions = numOptions; } - // add thread - CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); - OIC_LOG(DEBUG, TAG, "OUT"); - return CA_STATUS_OK; + CAEndpoint_t* ep = CACloneEndpoint(endpoint); + if (!ep) + { + OIC_LOG(ERROR, TAG, "endpoint clone failed"); + CADataDestroyer(cadata, sizeof(CAData_t)); + return NULL; + } -// memory error label. -memory_error_exit: - CAFreeEndpoint(remoteEndpoint); - CADestroyRequestInfoInternal(requestInfo); + cadata->remoteEndpoint = ep; + return cadata; - OICFree(data); - OIC_LOG(DEBUG, TAG, "OUT"); - return CA_MEMORY_ALLOC_FAILED; } -CAResult_t CADetachResponseMessage(const CAEndpoint_t *object, - const CAResponseInfo_t *response) +CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request) { OIC_LOG(DEBUG, TAG, "IN"); + VERIFY_NON_NULL(object, TAG, "object"); - VERIFY_NON_NULL(response, TAG, "response"); + VERIFY_NON_NULL(request, TAG, "request"); if (false == CAIsSelectedNetworkAvailable()) { return CA_STATUS_FAILED; } - CAEndpoint_t *remoteEndpoint = NULL; - CAResponseInfo_t *responseInfo = NULL; +#ifdef ARDUINO + // If max retransmission queue is reached, then don't handle new request + if (CA_MAX_RT_ARRAY_SIZE == u_arraylist_length(g_retransmissionContext.dataList)) + { + OIC_LOG(ERROR, TAG, "max RT queue size reached!"); + return CA_SEND_FAILED; + } +#endif /* ARDUINO */ - // allocate & initialize - CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - CA_MEMORY_ALLOC_CHECK(data); + CAData_t *data = CAPrepareSendData(object, request, CA_REQUEST_DATA); + if(!data) + { + OIC_LOG(ERROR, TAG, "CAPrepareSendData failed"); + return CA_MEMORY_ALLOC_FAILED; + } - // clone remote endpoint - remoteEndpoint = CACloneEndpoint(object); - CA_MEMORY_ALLOC_CHECK(remoteEndpoint); +#ifdef SINGLE_THREAD + CAProcessSendData(data); + CADataDestroyer(data, sizeof(CAData_t)); +#else + CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); +#endif - // clone response info - responseInfo = CACloneResponseInfo(response); - CA_MEMORY_ALLOC_CHECK(responseInfo); + OIC_LOG(DEBUG, TAG, "OUT"); + return CA_STATUS_OK; +} - // save data - data->type = SEND_TYPE_UNICAST; - data->remoteEndpoint = remoteEndpoint; - data->requestInfo = NULL; - data->responseInfo = responseInfo; - data->options = NULL; - data->numOptions = 0; - if (NULL != responseInfo->info.options && 0 < responseInfo->info.numOptions) - { - uint8_t numOptions = responseInfo->info.numOptions; - // copy data - CAHeaderOption_t *headerOption = (CAHeaderOption_t *) OICMalloc(sizeof(CAHeaderOption_t) - * numOptions); - CA_MEMORY_ALLOC_CHECK(headerOption); +CAResult_t CADetachResponseMessage(const CAEndpoint_t *object, + const CAResponseInfo_t *response) +{ + OIC_LOG(DEBUG, TAG, "IN"); + VERIFY_NON_NULL(object, TAG, "object"); + VERIFY_NON_NULL(response, TAG, "response"); - memcpy(headerOption, responseInfo->info.options, sizeof(CAHeaderOption_t) * numOptions); + if (false == CAIsSelectedNetworkAvailable()) + { + return CA_STATUS_FAILED; + } - data->options = headerOption; - data->numOptions = numOptions; + CAData_t *data = CAPrepareSendData(object, response, CA_RESPONSE_DATA); + if(!data) + { + OIC_LOG(ERROR, TAG, "CAPrepareSendData failed"); + return CA_MEMORY_ALLOC_FAILED; } - // add thread +#ifdef SINGLE_THREAD + CAProcessSendData(data); + CADataDestroyer(data, sizeof(CAData_t)); +#else CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); +#endif OIC_LOG(DEBUG, TAG, "OUT"); return CA_STATUS_OK; - -// memory error label. -memory_error_exit: - CAFreeEndpoint(remoteEndpoint); - CADestroyResponseInfoInternal(responseInfo); - OICFree(data); - OIC_LOG(DEBUG, TAG, "OUT"); - - return CA_MEMORY_ALLOC_FAILED; } CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token, @@ -739,12 +792,12 @@ CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t } void CASetInterfaceCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler, - CAErrorCallback errroHandler) + CAErrorCallback errorHandler) { OIC_LOG(DEBUG, TAG, "IN"); g_requestHandler = ReqHandler; g_responseHandler = RespHandler; - g_errorHandler = errroHandler; + g_errorHandler = errorHandler; OIC_LOG(DEBUG, TAG, "OUT"); } @@ -756,10 +809,11 @@ CAResult_t CAInitializeMessageHandler() CASetNetworkChangeCallback(CANetworkChangedCallback); CASetErrorHandleCallback(CAErrorHandler); +#ifndef SINGLE_THREAD // create thread pool CAResult_t res = ca_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle); - if (res != CA_STATUS_OK) + if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "thread pool initialize error."); return res; @@ -776,7 +830,7 @@ CAResult_t CAInitializeMessageHandler() // start send thread res = CAQueueingThreadStart(&g_sendThread); - if (res != CA_STATUS_OK) + if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "thread start error(send thread)."); ca_thread_pool_free(g_threadPoolHandle); @@ -801,7 +855,7 @@ CAResult_t CAInitializeMessageHandler() OIC_LOG(ERROR, TAG, "thread start error(receive thread)."); return res; } -#endif +#endif /* SINGLE_HANDLE */ // retransmission initialize CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData, @@ -810,7 +864,7 @@ CAResult_t CAInitializeMessageHandler() // start retransmission res = CARetransmissionStart(&g_retransmissionContext); - if (res != CA_STATUS_OK) + if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "thread start error(retransmission thread)."); return res; @@ -818,6 +872,13 @@ CAResult_t CAInitializeMessageHandler() // initialize interface adapters by controller CAInitializeAdapters(g_threadPoolHandle); +#else + // retransmission initialize + CARetransmissionInitialize(&g_retransmissionContext, NULL, CASendUnicastData, + CATimeoutCallback, NULL); + CAInitializeAdapters(); +#endif + OIC_LOG(DEBUG, TAG, "OUT"); return CA_STATUS_OK; } @@ -825,6 +886,7 @@ CAResult_t CAInitializeMessageHandler() void CATerminateMessageHandler() { OIC_LOG(DEBUG, TAG, "IN"); +#ifndef SINGLE_THREAD CATransportAdapter_t connType; u_arraylist_t *list = CAGetSelectedNetworkList(); uint32_t length = u_arraylist_length(list); @@ -862,7 +924,7 @@ void CATerminateMessageHandler() { #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading CAQueueingThreadStop(&gReceiveThread); -#endif +#endif /* SINGLE_HANDLE */ } // destroy thread pool @@ -878,6 +940,14 @@ void CATerminateMessageHandler() // terminate interface adapters by controller CATerminateAdapters(); +#else + // terminate interface adapters by controller + CATerminateAdapters(); + + // stop retransmission + CARetransmissionStop(&g_retransmissionContext); + CARetransmissionDestroy(&g_retransmissionContext); +#endif OIC_LOG(DEBUG, TAG, "OUT"); } @@ -892,18 +962,53 @@ void CALogPDUInfo(coap_pdu_t *pdu) OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code); - OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id)); - OIC_LOG(DEBUG, TAG, "PDU Maker - token :"); OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->token, pdu->hdr->token_length); } +static void CALogPayloadInfo(CAInfo_t *info) +{ + if(info) + { + if (!info->options) + { + for (uint32_t i = 0; i < info->numOptions; i++) + { + OIC_LOG_V(DEBUG, TAG, "optionID: %d", info->options[i].optionID); + + OIC_LOG_V(DEBUG, TAG, "list: %s", info->options[i].optionData); + } + } + + if (!info->payload) + { + OIC_LOG_V(DEBUG, TAG, "payload: %p(%u)", info->payload, + info->payloadSize); + } + + if (!info->token) + { + OIC_LOG(DEBUG, TAG, "token:"); + OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) info->token, + info->tokenLength); + } + OIC_LOG_V(DEBUG, TAG, "msgID: %d", info->messageId); + } + else + { + OIC_LOG(DEBUG, TAG, "info is NULL, cannot output log data"); + } +} + void CAErrorHandler(const CAEndpoint_t *endpoint, const void *data, uint32_t dataLen, CAResult_t result) { OIC_LOG(DEBUG, TAG, "IN"); + +#ifndef SINGLE_THREAD + VERIFY_NON_NULL_VOID(endpoint, TAG, "remoteEndpoint"); VERIFY_NON_NULL_VOID(data, TAG, "data"); @@ -917,65 +1022,20 @@ void CAErrorHandler(const CAEndpoint_t *endpoint, return; } - CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t)); - if (NULL == errorInfo) + CAData_t *cadata = CAGenerateHandlerData(endpoint, pdu, CA_ERROR_DATA); + if(!cadata) { - OIC_LOG(ERROR, TAG, "CAErrorHandler, Memory allocation failed!"); + OIC_LOG(ERROR, TAG, "CAErrorHandler, CAGenerateHandlerData failed!"); coap_delete_pdu(pdu); return; } - CAResult_t res = CAGetErrorInfoFromPDU(pdu, errorInfo); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "CAGetErrorInfoFromPDU failed : %d", res); - OICFree(errorInfo); - coap_delete_pdu(pdu); - return; - } - - errorInfo->result = result; - OIC_LOG_V(DEBUG, TAG, "error : %d", result); - if (NULL != errorInfo->info.payload) - { - OIC_LOG_V(DEBUG, TAG, "error, payload: %s", errorInfo->info.payload); - } - - OIC_LOG(DEBUG, TAG, "error, token"); - OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) errorInfo->info.token, - errorInfo->info.tokenLength); - OIC_LOG_V(DEBUG, TAG, "CAErrorHandler, msgID : %d", errorInfo->info.messageId); - - CAEndpoint_t *rep = NULL; - rep = CACloneEndpoint(endpoint); - if (!rep) - { - OIC_LOG(ERROR, TAG, "CAErrorHandler, CloneEndpoint Failed"); - OICFree(errorInfo); - coap_delete_pdu(pdu); - return; - } - - // store the data at queue. - CAData_t *cadata = NULL; - cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - if (NULL == cadata) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !"); - CAFreeEndpoint(rep); - OICFree(errorInfo); - coap_delete_pdu(pdu); - return; - } - - cadata->remoteEndpoint = rep; - cadata->requestInfo = NULL; - cadata->responseInfo = NULL; - cadata->errorInfo = errorInfo; - cadata->dataType = CA_ERROR_DATA; + cadata->errorInfo->result = result; CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); coap_delete_pdu(pdu); +#endif + OIC_LOG(DEBUG, TAG, "OUT"); return; } diff --git a/resource/csdk/connectivity/src/camessagehandler_singlethread.c b/resource/csdk/connectivity/src/camessagehandler_singlethread.c deleted file mode 100644 index a5c1293..0000000 --- a/resource/csdk/connectivity/src/camessagehandler_singlethread.c +++ /dev/null @@ -1,465 +0,0 @@ -/****************************************************************** - * - * Copyright 2014 Samsung Electronics All Rights Reserved. - * - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - ******************************************************************/ - -#include -#include -#include -#include - -#include "cainterface.h" -#include "camessagehandler_singlethread.h" -#include "caremotehandler.h" -#include "cainterfacecontroller.h" -#include "caprotocolmessage.h" -#include "caretransmission.h" -#include "logger.h" -#include "config.h" /* for coap protocol */ -#include "oic_malloc.h" - -#define TAG "CAMH_ST" - -#define CA_MAX_RT_ARRAY_SIZE 3 - -typedef enum -{ - SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST -} CASendDataType_t; - -typedef struct -{ - CASendDataType_t type; - CAEndpoint_t *remoteEndpoint; - CARequestInfo_t *requestInfo; - CAResponseInfo_t *responseInfo; - CAHeaderOption_t *options; - uint8_t numOptions; -} CAData_t; - - -static CARetransmission_t g_retransmissionContext; - -// handler field -static CARequestCallback g_requestHandler = NULL; -static CAResponseCallback g_responseHandler = NULL; -static CAErrorCallback g_errorHandler = NULL; - -static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uint32_t size) -{ - OIC_LOG(DEBUG, TAG, "IN"); - CAEndpoint_t* ep = CACloneEndpoint(endpoint); - if (NULL == ep) - { - OIC_LOG(ERROR, TAG, "clone failed"); - return; - } - - CAResponseInfo_t* resInfo = (CAResponseInfo_t*) OICCalloc(1, sizeof(CAResponseInfo_t)); - - if (NULL == resInfo) - { - OIC_LOG(ERROR, TAG, "calloc failed"); - CAFreeEndpoint(ep); - return; - } - - resInfo->result = CA_RETRANSMIT_TIMEOUT; - resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size); - resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size); - - if (g_responseHandler) - { - g_responseHandler(ep, resInfo); - } - - CAFreeEndpoint(ep); - OICFree(resInfo); - - OIC_LOG(DEBUG, TAG, "OUT"); -} -static void CAProcessData(const CAData_t *data) -{ - OIC_LOG(DEBUG, TAG, "IN"); - VERIFY_NON_NULL_VOID(data, TAG, "data"); - VERIFY_NON_NULL_VOID(data->remoteEndpoint, TAG, "remoteEndpoint"); - - CAResult_t res = CA_STATUS_FAILED; - - CASendDataType_t type = data->type; - - if (SEND_TYPE_UNICAST == type) - { - OIC_LOG(DEBUG,TAG,"Unicast Message"); - coap_pdu_t *pdu = NULL; - - if (NULL != data->requestInfo) - { - OIC_LOG(DEBUG, TAG, "reqInfo avlbl"); - - pdu = (coap_pdu_t *)CAGeneratePDU(data->requestInfo->method, &data->requestInfo->info); - } - else if (NULL != data->responseInfo) - { - OIC_LOG(DEBUG, TAG, "resInfo avlbl"); - - pdu = (coap_pdu_t *)CAGeneratePDU(data->responseInfo->result, &data->responseInfo->info); - } - else - { - OIC_LOG(DEBUG, TAG, "request info, response info is empty"); - return; - } - - // interface controller function call. - if (NULL != pdu) - { - CALogPDUInfo(pdu); - - res = CASendUnicastData(data->remoteEndpoint, pdu->hdr, pdu->length); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "send failed:%d", res); - coap_delete_pdu(pdu); - return; - } - // for retransmission - res = CARetransmissionSentData(&g_retransmissionContext, data->remoteEndpoint, pdu->hdr, - pdu->length); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(INFO, TAG, "retransmissions will not be working: %d", res); - coap_delete_pdu(pdu); - return; - } - - coap_delete_pdu(pdu); - } - else - { - OIC_LOG(ERROR,TAG, "Failed to Generate Unicast PDU"); - return; - } - } - else if (SEND_TYPE_MULTICAST == type) - { - OIC_LOG(DEBUG,TAG,"Multicast Message"); - if (NULL != data->requestInfo) - { - OIC_LOG(DEBUG, TAG, "reqInfo avlbl"); - - CAInfo_t *info = &data->requestInfo->info; - - info->options = data->options; - info->numOptions = data->numOptions; - - coap_pdu_t *pdu = (coap_pdu_t *)CAGeneratePDU(CA_GET, info); - - if (NULL != pdu) - { - CALogPDUInfo(pdu); - res = CASendMulticastData(data->remoteEndpoint, pdu->hdr, pdu->length); - if(CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "send failed:%d", res); - coap_delete_pdu(pdu); - return; - } - coap_delete_pdu(pdu); - } - else - { - OIC_LOG(ERROR,TAG,"Failed to Generate Multicast PDU"); - } - } - else - { - OIC_LOG(ERROR,TAG,"requestInfo is empty"); - } - } - - OIC_LOG(DEBUG, TAG, "OUT"); -} - -static void CAReceivedPacketCallback(CAEndpoint_t *endpoint, void *data, uint32_t dataLen) -{ - OIC_LOG(DEBUG, TAG, "IN"); - VERIFY_NON_NULL_VOID(data, TAG, "data"); - - uint32_t code = CA_NOT_FOUND; - coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code); - OICFree(data); - if (NULL == pdu) - { - OIC_LOG(ERROR, TAG, "Parse PDU failed"); - return; - } - - char uri[CA_MAX_URI_LENGTH] = { }; - - if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code) - { - CARequestInfo_t *ReqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t)); - if (NULL == ReqInfo) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!"); - coap_delete_pdu(pdu); - return; - } - - CAResult_t res = CAGetRequestInfoFromPDU(pdu, ReqInfo); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "CAGetRequestInfoFromPDU failed : %d", res); - OICFree(ReqInfo); - coap_delete_pdu(pdu); - return; - } - - if (NULL != ReqInfo->info.options) - { - for (uint32_t i = 0; i < ReqInfo->info.numOptions; i++) - { - OIC_LOG_V(DEBUG, TAG, "optionID: %d", ReqInfo->info.options[i].optionID); - - OIC_LOG_V(DEBUG, TAG, "list: %s", ReqInfo->info.options[i].optionData); - } - } - - if (NULL != ReqInfo->info.payload) - { - OIC_LOG_V(DEBUG, TAG, "Request- payload: %s", ReqInfo->info.payload); - } - - OIC_LOG_V(DEBUG, TAG, "code: %d", ReqInfo->method); - OIC_LOG(DEBUG, TAG, "token:"); - OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) ReqInfo->info.token, CA_MAX_TOKEN_LEN); - - if (g_requestHandler) - { - g_requestHandler(endpoint, ReqInfo); - } - - CADestroyRequestInfoInternal(ReqInfo); - } - else - { - CAResponseInfo_t *ResInfo = (CAResponseInfo_t *) OICCalloc(1, sizeof(CAResponseInfo_t)); - if (NULL == ResInfo) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!"); - coap_delete_pdu(pdu); - return; - } - - CAResult_t res = CAGetResponseInfoFromPDU(pdu, ResInfo); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "CAGetResponseInfoFromPDU failed : %d", res); - OICFree(ResInfo); - coap_delete_pdu(pdu); - return; - } - - if (NULL != ResInfo->info.options) - { - for (uint32_t i = 0; i < ResInfo->info.numOptions; i++) - { - OIC_LOG_V(DEBUG, TAG, "optionID: %d", ResInfo->info.options[i].optionID); - - OIC_LOG_V(DEBUG, TAG, "list: %s", ResInfo->info.options[i].optionData); - } - } - - if (NULL != ResInfo->info.payload) - { - OIC_LOG_V(DEBUG, TAG, "payload: %s", ResInfo->info.payload); - } - OIC_LOG_V(DEBUG, TAG, "code: %d", ResInfo->result); - - // for retransmission - void *retransmissionPdu = NULL; - CARetransmissionReceivedData(&g_retransmissionContext, endpoint, pdu->hdr, pdu->length, - &retransmissionPdu); - - // get token from saved data in retransmission list - if (retransmissionPdu && CA_EMPTY == code) - { - CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu, - &(ResInfo->info)); - if(CA_STATUS_OK != res) - { - OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list"); - OICFree(ResInfo->info.token); - } - } - OICFree(retransmissionPdu); - - if (NULL != ResInfo) - { - if (g_responseHandler) - { - g_responseHandler(endpoint, ResInfo); - } - CADestroyResponseInfoInternal(ResInfo); - } - } - - if (pdu) - { - coap_delete_pdu(pdu); - } - OIC_LOG(DEBUG, TAG, "OUT"); -} - -static void CANetworkChangedCallback(CAEndpoint_t *info, CANetworkStatus_t status) -{ - OIC_LOG(DEBUG, TAG, "IN"); - - OIC_LOG(DEBUG, TAG, "OUT"); -} - -void CAHandleRequestResponseCallbacks() -{ - CAReadData(); - CARetransmissionBaseRoutine((void *)&g_retransmissionContext); -} - -CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request) -{ - OIC_LOG(DEBUG, TAG, "IN"); - - VERIFY_NON_NULL(object, TAG, "object"); - VERIFY_NON_NULL(request, TAG, "request"); - - // If max retransmission queue is reached, then don't handle new request - if (CA_MAX_RT_ARRAY_SIZE == u_arraylist_length(g_retransmissionContext.dataList)) - { - OIC_LOG(ERROR, TAG, "max RT queue size reached!"); - return CA_SEND_FAILED; - } - - // allocate & initialize - CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - CA_MEMORY_ALLOC_CHECK(data); - - // save data - data->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST; - data->remoteEndpoint = object; - data->requestInfo = request; - data->responseInfo = NULL; - - CAProcessData(data); - OICFree(data); - OIC_LOG(DEBUG, TAG, "OUT"); - return CA_STATUS_OK; - -// memory error label. -memory_error_exit: - OICFree(data); - OIC_LOG(DEBUG, TAG, "OUT"); - return CA_MEMORY_ALLOC_FAILED; -} - -CAResult_t CADetachResponseMessage(const CAEndpoint_t *object, - const CAResponseInfo_t *response) -{ - OIC_LOG(DEBUG, TAG, "IN"); - VERIFY_NON_NULL(object, TAG, "object"); - VERIFY_NON_NULL(response, TAG, "response"); - - // allocate & initialize - CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - CA_MEMORY_ALLOC_CHECK(data); - - // save data - data->type = SEND_TYPE_UNICAST; - data->remoteEndpoint = object; - data->requestInfo = NULL; - data->responseInfo = response; - - CAProcessData(data); - - OICFree(data); - OIC_LOG(DEBUG, TAG, "OUT"); - return CA_STATUS_OK; - -// memory error label. -memory_error_exit: - OICFree(data); - OIC_LOG(DEBUG, TAG, "OUT"); - - return CA_MEMORY_ALLOC_FAILED; -} - -void CASetInterfaceCallbacks(CARequestCallback ReqHandler, - CAResponseCallback RespHandler, CAErrorCallback errorHandler) -{ - OIC_LOG(DEBUG, TAG, "IN"); - g_requestHandler = ReqHandler; - g_responseHandler = RespHandler; - g_errorHandler = errorHandler; - OIC_LOG(DEBUG, TAG, "OUT"); -} - -CAResult_t CAInitializeMessageHandler() -{ - OIC_LOG(DEBUG, TAG, "IN"); - CASetPacketReceivedCallback(CAReceivedPacketCallback); - - CASetNetworkChangeCallback(CANetworkChangedCallback); - - // retransmission initialize - CARetransmissionInitialize(&g_retransmissionContext, NULL, CASendUnicastData, - CATimeoutCallback, NULL); - - CAInitializeAdapters(NULL); - OIC_LOG(DEBUG, TAG, "OUT"); - return CA_STATUS_OK; -} - -void CATerminateMessageHandler() -{ - OIC_LOG(DEBUG, TAG, "IN"); - // terminate interface adapters by controller - CATerminateAdapters(); - - // stop retransmission - CARetransmissionStop(&g_retransmissionContext); - CARetransmissionDestroy(&g_retransmissionContext); - - OIC_LOG(DEBUG, TAG, "OUT"); -} - -void CALogPDUInfo(coap_pdu_t *pdu) -{ - VERIFY_NON_NULL_VOID(pdu, TAG, "pdu"); - - OIC_LOG_V(DEBUG, TAG, "PDU Maker - payload : %s", pdu->data); - - OIC_LOG_V(DEBUG, TAG, "PDU Maker - type : %d", pdu->hdr->type); - - OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code); - - OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id)); - - OIC_LOG(DEBUG, TAG, "PDU Maker - token :"); - - OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->token, pdu->hdr->token_length); -} - diff --git a/resource/csdk/connectivity/src/ip_adapter/caipadapter.c b/resource/csdk/connectivity/src/ip_adapter/caipadapter.c index 3a320bd..da89ab9 100644 --- a/resource/csdk/connectivity/src/ip_adapter/caipadapter.c +++ b/resource/csdk/connectivity/src/ip_adapter/caipadapter.c @@ -178,21 +178,9 @@ void CAIPPacketReceivedCB(const CAEndpoint_t *endpoint, const void *data, OIC_LOG_V(DEBUG, TAG, "Address: %s, port:%d", endpoint->addr, endpoint->port); - void *buf = OICCalloc(dataLength + 1, sizeof (char)); - if (!buf) - { - OIC_LOG(ERROR, TAG, "Memory Allocation failed!"); - return; - } - memcpy(buf, data, dataLength); - if (g_networkPacketCallback) { - g_networkPacketCallback(endpoint, buf, dataLength); - } - else - { - OICFree(buf); + g_networkPacketCallback(endpoint, data, dataLength); } OIC_LOG(DEBUG, TAG, "OUT"); }