X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=resource%2Fcsdk%2Fconnectivity%2Fsrc%2Fcamessagehandler.c;h=1d7ac577d05aee5f1c2716c39543bc6848a1b98b;hb=6ce84982ea5cf8597d28516998be9a3823bd3b56;hp=a4e37c341737e8c3b3c8541aa63f43ce002fb215;hpb=07b4387b438aecacf3b6a7a4bb035f4a6b94f48d;p=platform%2Fupstream%2Fiotivity.git diff --git a/resource/csdk/connectivity/src/camessagehandler.c b/resource/csdk/connectivity/src/camessagehandler.c index a4e37c3..1d7ac57 100644 --- a/resource/csdk/connectivity/src/camessagehandler.c +++ b/resource/csdk/connectivity/src/camessagehandler.c @@ -1,4 +1,4 @@ -/****************************************************************** +/* ***************************************************************** * * Copyright 2014 Samsung Electronics All Rights Reserved. * @@ -26,48 +26,26 @@ #include "cainterface.h" #include "camessagehandler.h" #include "caremotehandler.h" -#include "cainterfacecontroller.h" #include "caprotocolmessage.h" -#include "caretransmission.h" -#include "caadapterutils.h" -#include "uqueue.h" #include "logger.h" #include "config.h" /* for coap protocol */ -#include "cathreadpool.h" /* for thread pool */ -#include "caqueueingthread.h" -#include "camutex.h" #include "oic_malloc.h" -#include "oic_string.h" #include "canetworkconfigurator.h" +#include "caadapterutils.h" +#include "cainterfacecontroller.h" +#include "caretransmission.h" -#define TAG PCF("CA") -#define SINGLE_HANDLE - -#define MAX_THREAD_POOL_SIZE 20 - -typedef enum -{ - SEND_TYPE_MULTICAST = 0, SEND_TYPE_UNICAST -} CASendDataType_t; +#ifdef WITH_BWT +#include "cablockwisetransfer.h" +#endif -typedef enum -{ - CA_REQUEST_DATA = 1, - CA_RESPONSE_DATA = 2, - CA_ERROR_DATA = 3, -} CADataType_t; +#ifndef SINGLE_THREAD +#include "uqueue.h" +#include "cathreadpool.h" /* for thread pool */ +#include "caqueueingthread.h" -typedef struct -{ - CASendDataType_t type; - CAEndpoint_t *remoteEndpoint; - CARequestInfo_t *requestInfo; - CAResponseInfo_t *responseInfo; - CAErrorInfo_t *errorInfo; - CAHeaderOption_t *options; - CADataType_t dataType; - uint8_t numOptions; -} CAData_t; +#define SINGLE_HANDLE +#define MAX_THREAD_POOL_SIZE 20 // thread pool handle static ca_thread_pool_t g_threadPoolHandle = NULL; @@ -76,6 +54,12 @@ static ca_thread_pool_t g_threadPoolHandle = NULL; static CAQueueingThread_t g_sendThread; static CAQueueingThread_t g_receiveThread; +#else +#define CA_MAX_RT_ARRAY_SIZE 3 +#endif /* SINGLE_THREAD */ + +#define TAG "CA_MSG_HNDLR" + static CARetransmission_t g_retransmissionContext; // handler field @@ -87,6 +71,40 @@ static void CAErrorHandler(const CAEndpoint_t *endpoint, const void *data, uint32_t dataLen, CAResult_t result); +static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint, const void *data, + CADataType_t dataType); + +#ifdef SINGLE_THREAD +static void CAProcessReceivedData(CAData_t *data); +#endif +static void CADestroyData(void *data, uint32_t size); +static void CALogPayloadInfo(CAInfo_t *info); +static bool CADropSecondRequest(const CAEndpoint_t *endpoint, uint16_t messageId); + +#ifdef WITH_BWT +void CAAddDataToSendThread(CAData_t *data) +{ + OIC_LOG(DEBUG, TAG, "IN"); + VERIFY_NON_NULL_VOID(data, TAG, "data"); + + // add thread + CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); + + OIC_LOG(DEBUG, TAG, "OUT"); +} + +void CAAddDataToReceiveThread(CAData_t *data) +{ + OIC_LOG(DEBUG, TAG, "IN - CAAddDataToReceiveThread"); + VERIFY_NON_NULL_VOID(data, TAG, "data"); + + // add thread + CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t)); + + OIC_LOG(DEBUG, TAG, "OUT - CAAddDataToReceiveThread"); +} +#endif + static bool CAIsSelectedNetworkAvailable() { u_arraylist_t *list = CAGetSelectedNetworkList(); @@ -99,6 +117,123 @@ static bool CAIsSelectedNetworkAvailable() return true; } +static CAData_t* CAGenerateHandlerData(const CAEndpoint_t *endpoint, const void *data, CADataType_t dataType) +{ + OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData IN"); + CAInfo_t *info = NULL; + CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); + if (!cadata) + { + OIC_LOG(ERROR, TAG, "memory allocation failed"); + return NULL; + } + + CAEndpoint_t* ep = CACloneEndpoint(endpoint); + if (!ep) + { + OIC_LOG(ERROR, TAG, "endpoint clone failed"); + OICFree(cadata); + return NULL; + } + + OIC_LOG_V(DEBUG, TAG, "address : %s", ep->addr); + CAResult_t result; + + if(CA_RESPONSE_DATA == dataType) + { + CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t)); + if (!resInfo) + { + OIC_LOG(ERROR, TAG, "memory allocation failed"); + OICFree(cadata); + CAFreeEndpoint(ep); + return NULL; + } + + result = CAGetResponseInfoFromPDU(data, resInfo); + if (CA_STATUS_OK != result) + { + OIC_LOG(ERROR, TAG, "CAGetResponseInfoFromPDU Failed"); + CAFreeEndpoint(ep); + CADestroyResponseInfoInternal(resInfo); + OICFree(cadata); + return NULL; + } + cadata->responseInfo = resInfo; + info = &resInfo->info; + OIC_LOG(DEBUG, TAG, "Response Info :"); + CALogPayloadInfo(info); + } + else if (CA_REQUEST_DATA == dataType) + { + CARequestInfo_t* reqInfo = (CARequestInfo_t*)OICCalloc(1, sizeof(CARequestInfo_t)); + if (!reqInfo) + { + OIC_LOG(ERROR, TAG, "memory allocation failed"); + OICFree(cadata); + CAFreeEndpoint(ep); + return NULL; + } + + result = CAGetRequestInfoFromPDU(data, reqInfo); + if (CA_STATUS_OK != result) + { + OIC_LOG(ERROR, TAG, "CAGetRequestInfoFromPDU failed"); + CAFreeEndpoint(ep); + CADestroyRequestInfoInternal(reqInfo); + OICFree(cadata); + return NULL; + } + + if (CADropSecondRequest(endpoint, reqInfo->info.messageId)) + { + OIC_LOG(ERROR, TAG, "Second Request with same Token, Drop it"); + CAFreeEndpoint(ep); + CADestroyRequestInfoInternal(reqInfo); + OICFree(cadata); + return NULL; + } + + cadata->requestInfo = reqInfo; + info = &reqInfo->info; + OIC_LOG(DEBUG, TAG, "Request Info :"); + CALogPayloadInfo(info); + } + else if (CA_ERROR_DATA == dataType) + { + CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t)); + if (!errorInfo) + { + OIC_LOG(ERROR, TAG, "Memory allocation failed!"); + OICFree(cadata); + CAFreeEndpoint(ep); + return NULL; + } + + CAResult_t result = CAGetErrorInfoFromPDU(data, errorInfo); + if (CA_STATUS_OK != result) + { + OIC_LOG(ERROR, TAG, "CAGetErrorInfoFromPDU failed"); + CAFreeEndpoint(ep); + OICFree(errorInfo); + OICFree(cadata); + return NULL; + } + + cadata->errorInfo = errorInfo; + info = &errorInfo->info; + OIC_LOG(DEBUG, TAG, "error Info :"); + CALogPayloadInfo(info); + } + + cadata->remoteEndpoint = ep; + cadata->dataType = dataType; + + return cadata; + + OIC_LOG(DEBUG, TAG, "CAGenerateHandlerData OUT"); +} + static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uint32_t size) { OIC_LOG(DEBUG, TAG, "IN"); @@ -106,7 +241,7 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin VERIFY_NON_NULL_VOID(pdu, TAG, "pdu"); CAEndpoint_t* ep = CACloneEndpoint(endpoint); - if (NULL == ep) + if (!ep) { OIC_LOG(ERROR, TAG, "clone failed"); return; @@ -114,7 +249,7 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin CAResponseInfo_t* resInfo = (CAResponseInfo_t*)OICCalloc(1, sizeof(CAResponseInfo_t)); - if (NULL == resInfo) + if (!resInfo) { OIC_LOG(ERROR, TAG, "calloc failed"); CAFreeEndpoint(ep); @@ -124,12 +259,12 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin resInfo->result = CA_RETRANSMIT_TIMEOUT; resInfo->info.type = CAGetMessageTypeFromPduBinaryData(pdu, size); resInfo->info.messageId = CAGetMessageIdFromPduBinaryData(pdu, size); + CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *) pdu, &(resInfo->info)); if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list"); - OICFree(resInfo->info.token); - OICFree(resInfo); + CADestroyResponseInfoInternal(resInfo); CAFreeEndpoint(ep); return; } @@ -139,7 +274,7 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin { OIC_LOG(ERROR, TAG, "memory allocation failed !"); CAFreeEndpoint(ep); - OICFree(resInfo); + CADestroyResponseInfoInternal(resInfo); return; } @@ -148,13 +283,18 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin cadata->requestInfo = NULL; cadata->responseInfo = resInfo; +#ifdef SINGLE_THREAD + CAProcessReceivedData(cadata); +#else CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); +#endif + OIC_LOG(DEBUG, TAG, "OUT"); } -static void CADataDestroyer(void *data, uint32_t size) +static void CADestroyData(void *data, uint32_t size) { - OIC_LOG(DEBUG, TAG, "IN"); + OIC_LOG(DEBUG, TAG, "CADestroyData IN"); CAData_t *cadata = (CAData_t *) data; if (NULL == cadata) @@ -180,27 +320,19 @@ static void CADataDestroyer(void *data, uint32_t size) if (NULL != cadata->errorInfo) { - CAInfo_t *info = &cadata->errorInfo->info; - OICFree(info->token); - OICFree(info->options); - OICFree(info->payload); - OICFree(info->resourceUri); - OICFree(cadata->errorInfo); + CADestroyErrorInfoInternal(cadata->errorInfo); } + OICFree(cadata->options); OICFree(cadata); - OIC_LOG(DEBUG, TAG, "OUT"); + OIC_LOG(DEBUG, TAG, "CADestroyData OUT"); } -static void CAReceiveThreadProcess(void *threadData) +#ifdef SINGLE_THREAD +static void CAProcessReceivedData(CAData_t *data) { - OIC_LOG(DEBUG, TAG, "IN"); - // Currently not supported - // This will be enabled when RI supports multi threading -#ifndef SINGLE_HANDLE - CAData_t *data = (CAData_t *) threadData; - - if (NULL == data) + OIC_LOG(DEBUG, TAG, "CAProcessReceivedData IN"); + if (!data) { OIC_LOG(ERROR, TAG, "thread data error!!"); return; @@ -210,8 +342,7 @@ static void CAReceiveThreadProcess(void *threadData) // #1 parse the data // #2 get endpoint CAEndpoint_t *rep = (CAEndpoint_t *)(data->remoteEndpoint); - - if (NULL == rep) + if (!rep) { OIC_LOG(ERROR, TAG, "remoteEndpoint error!!"); return; @@ -230,15 +361,30 @@ static void CAReceiveThreadProcess(void *threadData) g_errorHandler(rep, data->errorInfo); } -#endif /* SINGLE_HANDLE */ - OIC_LOG(DEBUG, TAG, "OUT"); +#ifdef SINGLE_THREAD + CADestroyData(data, sizeof(CAData_t)); +#endif + + OIC_LOG(DEBUG, TAG, "CAProcessReceivedData OUT"); } +#endif -static void CASendThreadProcess(void *threadData) +#ifndef SINGLE_THREAD + +static void CAReceiveThreadProcess(void *threadData) { OIC_LOG(DEBUG, TAG, "IN"); +#ifndef SINGLE_HANDLE CAData_t *data = (CAData_t *) threadData; + CAProcessReceivedData(data); +#endif + OIC_LOG(DEBUG, TAG, "OUT"); +} +#endif +static void CAProcessSendData(const CAData_t *data) +{ + OIC_LOG(DEBUG, TAG, "IN"); VERIFY_NON_NULL_VOID(data, TAG, "data"); VERIFY_NON_NULL_VOID(data->remoteEndpoint, TAG, "remoteEndpoint"); @@ -251,21 +397,55 @@ static void CASendThreadProcess(void *threadData) if (SEND_TYPE_UNICAST == type) { + OIC_LOG(DEBUG,TAG,"Unicast message"); if (NULL != data->requestInfo) { OIC_LOG(DEBUG, TAG, "requestInfo is available.."); pdu = CAGeneratePDU(data->requestInfo->method, &data->requestInfo->info); + +#ifdef WITH_BWT + if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter) + { + // Blockwise transfer + CAResult_t res = CAAddBlockOption(&pdu, + data->requestInfo->info); + if (CA_STATUS_OK != res) + { + OIC_LOG(INFO, TAG, "to write block option has failed"); + CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res); + coap_delete_pdu(pdu); + return; + } + } +#endif } else if (NULL != data->responseInfo) { OIC_LOG(DEBUG, TAG, "responseInfo is available.."); pdu = CAGeneratePDU(data->responseInfo->result, &data->responseInfo->info); + +#ifdef WITH_BWT + if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter) + { + // Blockwise transfer + CAResult_t res = CAAddBlockOption(&pdu, + data->responseInfo->info); + if (CA_STATUS_OK != res) + { + OIC_LOG(INFO, TAG, "to write block option has failed"); + CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res); + coap_delete_pdu(pdu); + return; + } + } +#endif } else { OIC_LOG(DEBUG, TAG, "request info, response info is empty"); + return; } // interface controller function call. @@ -285,199 +465,209 @@ static void CASendThreadProcess(void *threadData) pdu->length); if (CA_STATUS_OK != res) { - OIC_LOG_V(INFO, TAG, "retransmission will be not working: %d", res); + OIC_LOG_V(INFO, TAG, "retransmission is not enabled due to error, res : %d", res); coap_delete_pdu(pdu); return; } coap_delete_pdu(pdu); } + else + { + OIC_LOG(ERROR,TAG,"Failed to generate unicast PDU"); + return; + } } else if (SEND_TYPE_MULTICAST == type) { - OIC_LOG(DEBUG, TAG, "both requestInfo & responseInfo is not available"); - - CAInfo_t *info = &data->requestInfo->info; - - info->options = data->options; - info->numOptions = data->numOptions; - - pdu = CAGeneratePDU(CA_GET, info); - if (NULL != pdu) + OIC_LOG(DEBUG,TAG,"Multicast message"); + if (NULL != data->requestInfo) { - CALogPDUInfo(pdu); + OIC_LOG(DEBUG, TAG, "requestInfo is available.."); + CAInfo_t *info = &data->requestInfo->info; - res = CASendMulticastData(data->remoteEndpoint, pdu->hdr, pdu->length); - if (CA_STATUS_OK != res) + pdu = CAGeneratePDU(CA_GET, info); + if (NULL != pdu) { - OIC_LOG_V(ERROR, TAG, "send failed:%d", res); +#ifdef WITH_BWT + if (CA_ADAPTER_GATT_BTLE != data->remoteEndpoint->adapter) + { + // Blockwise transfer + CAResult_t res = CAAddBlockOption(&pdu, + data->requestInfo->info); + if (CA_STATUS_OK != res) + { + OIC_LOG(DEBUG, TAG, "CAAddBlockOption has failed"); + CAErrorHandler(data->remoteEndpoint, pdu->hdr, pdu->length, res); + coap_delete_pdu(pdu); + return; + } + } +#endif + CALogPDUInfo(pdu); + + res = CASendMulticastData(data->remoteEndpoint, pdu->hdr, pdu->length); + if (CA_STATUS_OK != res) + { + OIC_LOG_V(ERROR, TAG, "send failed:%d", res); + coap_delete_pdu(pdu); + return; + } + coap_delete_pdu(pdu); - return; } - - coap_delete_pdu(pdu); + else + { + OIC_LOG(ERROR,TAG,"Failed to generate multicast PDU"); + } + } + else + { + OIC_LOG(ERROR, TAG, "request info is empty"); } } OIC_LOG(DEBUG, TAG, "OUT"); } -static void CAReceivedPacketCallback(const CAEndpoint_t *endpoint, void *data, uint32_t dataLen) +#ifndef SINGLE_THREAD +static void CASendThreadProcess(void *threadData) { - OIC_LOG(DEBUG, TAG, "IN"); - VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint"); - VERIFY_NON_NULL_VOID(data, TAG, "data"); + CAData_t *data = (CAData_t *) threadData; + CAProcessSendData(data); +} - uint32_t code = CA_NOT_FOUND; - coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code); - OICFree(data); +#endif - if (NULL == pdu) +/* + * If a second message arrives with the same token and the other address + * family, drop it. Typically, IPv6 beats IPv4, so the IPv4 message is dropped. + * This can be made more robust (for instance, another message could arrive + * in between), but it is good enough for now. + */ +static bool CADropSecondRequest(const CAEndpoint_t *endpoint, uint16_t messageId) +{ + if (!endpoint) { - OIC_LOG(ERROR, TAG, "Parse PDU failed"); - return; + return true; } - - if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code) + if (endpoint->adapter != CA_ADAPTER_IP) { - CARequestInfo_t *ReqInfo = (CARequestInfo_t *) OICCalloc(1, sizeof(CARequestInfo_t)); - if (NULL == ReqInfo) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!"); - coap_delete_pdu(pdu); - return; - } + return false; + } - CAResult_t res = CAGetRequestInfoFromPDU(pdu, ReqInfo); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "CAGetRequestInfoFromPDU failed : %d", res); - OICFree(ReqInfo); - coap_delete_pdu(pdu); - return; - } + bool ret = false; + CATransportFlags_t familyFlags = endpoint->flags & CA_IPFAMILY_MASK; - if (NULL != ReqInfo->info.options) + if (messageId == caglobals.ca.previousRequestMessageId) + { + if ((familyFlags ^ caglobals.ca.previousRequestFlags) == CA_IPFAMILY_MASK) { - uint32_t i; - for (i = 0; i < ReqInfo->info.numOptions; i++) + if (familyFlags & CA_IPV6) { - OIC_LOG_V(DEBUG, TAG, "Request- optionID: %d", ReqInfo->info.options[i].optionID); - - OIC_LOG_V(DEBUG, TAG, "Request- list: %s", ReqInfo->info.options[i].optionData); + OIC_LOG(INFO, TAG, "IPv6 duplicate response ignored"); + } + else + { + OIC_LOG(INFO, TAG, "IPv4 duplicate response ignored"); } + ret = true; } + } + caglobals.ca.previousRequestFlags = familyFlags; + caglobals.ca.previousRequestMessageId = messageId; + return ret; +} - OIC_LOG_V(DEBUG, TAG, "Request- code: %d", ReqInfo->method); - if (NULL != ReqInfo->info.token) - { - OIC_LOG(DEBUG, TAG, "Request- token:"); - OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) ReqInfo->info.token, - ReqInfo->info.tokenLength); - } +static void CAReceivedPacketCallback(const CAEndpoint_t *remoteEndpoint, const void *data, uint32_t dataLen) +{ + OIC_LOG(DEBUG, TAG, "IN"); + VERIFY_NON_NULL_VOID(remoteEndpoint, TAG, "remoteEndpoint"); + VERIFY_NON_NULL_VOID(data, TAG, "data"); - OIC_LOG_V(DEBUG, TAG, "Request- msgID : %d", ReqInfo->info.messageId); - // store the data at queue. - CAData_t *cadata = NULL; - cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - if (NULL == cadata) + uint32_t code = CA_NOT_FOUND; + CAData_t *cadata = NULL; + + coap_pdu_t *pdu = (coap_pdu_t *) CAParsePDU((const char *) data, dataLen, &code); + if (NULL == pdu) + { + OIC_LOG(ERROR, TAG, "Parse PDU failed"); + return; + } + + OIC_LOG_V(DEBUG, TAG, "code = %d", code); + if (CA_GET == code || CA_POST == code || CA_PUT == code || CA_DELETE == code) + { + cadata = CAGenerateHandlerData(remoteEndpoint, pdu, CA_REQUEST_DATA); + if (!cadata) { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !"); - CADestroyRequestInfoInternal(ReqInfo); + OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!"); coap_delete_pdu(pdu); return; } - - cadata->type = SEND_TYPE_UNICAST; - cadata->remoteEndpoint = CACloneEndpoint(endpoint); - cadata->requestInfo = ReqInfo; - cadata->responseInfo = NULL; - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); } else { - CAResponseInfo_t *ResInfo = (CAResponseInfo_t *) OICCalloc(1, sizeof(CAResponseInfo_t)); - if (NULL == ResInfo) + cadata = CAGenerateHandlerData(remoteEndpoint, pdu, CA_RESPONSE_DATA); + if (!cadata) { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed!"); + OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, CAGenerateHandlerData failed!"); coap_delete_pdu(pdu); return; } - CAResult_t res = CAGetResponseInfoFromPDU(pdu, ResInfo); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "CAGetResponseInfoFromPDU failed : %d", res); - OICFree(ResInfo); - coap_delete_pdu(pdu); - return; - } + // for retransmission + void *retransmissionPdu = NULL; + CARetransmissionReceivedData(&g_retransmissionContext, cadata->remoteEndpoint, pdu->hdr, + pdu->length, &retransmissionPdu); - if (NULL != ResInfo->info.options) + // get token from saved data in retransmission list + if (retransmissionPdu && CA_EMPTY == code) { - uint32_t i; - for (i = 0; i < ResInfo->info.numOptions; i++) + if (cadata->responseInfo) { - OIC_LOG_V(DEBUG, TAG, "Response- optionID: %d", ResInfo->info.options[i].optionID); - - OIC_LOG_V(DEBUG, TAG, "Response- list: %s", ResInfo->info.options[i].optionData); + CAInfo_t *info = &cadata->responseInfo->info; + CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu, + info); + if (CA_STATUS_OK != res) + { + OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list"); + OICFree(info->token); + info->tokenLength = 0; + } } } + OICFree(retransmissionPdu); + } - if (NULL != ResInfo->info.payload) - { - OIC_LOG_V(DEBUG, TAG, "Response- payload: %p(%u) from %s", ResInfo->info.payload, - ResInfo->info.payloadSize, endpoint->addr); - } - OIC_LOG_V(DEBUG, TAG, "Response- code: %d", ResInfo->result); - if (NULL != ResInfo->info.token) - { - OIC_LOG(DEBUG, TAG, "Response- token:"); - OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) ResInfo->info.token, - ResInfo->info.tokenLength); - } - OIC_LOG_V(DEBUG, TAG, "Response- msgID: %d", ResInfo->info.messageId); - - // store the data at queue. - CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - if (NULL == cadata) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !"); - CADestroyResponseInfoInternal(ResInfo); - coap_delete_pdu(pdu); - return; - } - - cadata->type = SEND_TYPE_UNICAST; - cadata->remoteEndpoint = CACloneEndpoint(endpoint); - cadata->requestInfo = NULL; - - // for retransmission - void *retransmissionPdu = NULL; - CARetransmissionReceivedData(&g_retransmissionContext, endpoint, pdu->hdr, pdu->length, - &retransmissionPdu); + cadata->type = SEND_TYPE_UNICAST; - // get token from saved data in retransmission list - if (retransmissionPdu && CA_EMPTY == code) +#ifdef SINGLE_THREAD + CAProcessReceivedData(cadata); +#else +#ifdef WITH_BWT + if (CA_ADAPTER_GATT_BTLE != remoteEndpoint->adapter) { - CAResult_t res = CAGetTokenFromPDU((const coap_hdr_t *)retransmissionPdu, - &(ResInfo->info)); - if (CA_STATUS_OK != res) + CAResult_t res = CAReceiveBlockWiseData(pdu, remoteEndpoint, cadata, dataLen); + if (CA_NOT_SUPPORTED == res) { - OIC_LOG(ERROR, TAG, "fail to get Token from retransmission list"); - OICFree(ResInfo->info.token); + OIC_LOG(ERROR, TAG, "this message does not have block option"); + CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + } + else + { + CADestroyData(cadata, sizeof(CAData_t)); } } - OICFree(retransmissionPdu); - cadata->responseInfo = ResInfo; + else +#endif + { + CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + } +#endif - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); - } + coap_delete_pdu(pdu); - if (pdu) - { - coap_delete_pdu(pdu); - } OIC_LOG(DEBUG, TAG, "OUT"); } @@ -490,7 +680,10 @@ static void CANetworkChangedCallback(const CAEndpoint_t *info, CANetworkStatus_t void CAHandleRequestResponseCallbacks() { - +#ifdef SINGLE_THREAD + CAReadData(); + CARetransmissionBaseRoutine((void *)&g_retransmissionContext); +#else #ifdef SINGLE_HANDLE // parse the data and call the callbacks. // #1 parse the data @@ -534,136 +727,199 @@ void CAHandleRequestResponseCallbacks() g_errorHandler(td->remoteEndpoint, td->errorInfo); } - CADataDestroyer(msg, sizeof(CAData_t)); + CADestroyData(msg, sizeof(CAData_t)); OICFree(item); +#endif /* SINGLE_HANDLE */ #endif - OIC_LOG(DEBUG, TAG, "CAHandleRequestResponseCallbacks OUT"); } -CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request) +static CAData_t* CAPrepareSendData(const CAEndpoint_t *endpoint, const void *sendData, + CADataType_t dataType) { - OIC_LOG(DEBUG, TAG, "IN"); - - VERIFY_NON_NULL(object, TAG, "object"); - VERIFY_NON_NULL(request, TAG, "request"); + OIC_LOG(DEBUG, TAG, "CAPrepareSendData IN"); + CAInfo_t *info = NULL; - if (false == CAIsSelectedNetworkAvailable()) + CAData_t *cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); + if (!cadata) { - return CA_STATUS_FAILED; + OIC_LOG(ERROR, TAG, "memory allocation failed"); + return NULL; } - CAEndpoint_t *remoteEndpoint = NULL; - CARequestInfo_t *requestInfo = NULL; - CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - CA_MEMORY_ALLOC_CHECK(data); + if(CA_REQUEST_DATA == dataType) + { + // clone request info + CARequestInfo_t *request = CACloneRequestInfo((CARequestInfo_t *)sendData); + + if(!request) + { + OIC_LOG(ERROR, TAG, "CACloneRequestInfo failed"); + OICFree(cadata); + return NULL; + } - // clone remote endpoint - remoteEndpoint = CACloneEndpoint(object); - CA_MEMORY_ALLOC_CHECK(remoteEndpoint); + cadata->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST; + info = &request->info; + cadata->requestInfo = request; + } + else if(CA_RESPONSE_DATA == dataType) + { + // clone response info + CAResponseInfo_t *response = CACloneResponseInfo((CAResponseInfo_t *)sendData); - // clone request info - requestInfo = CACloneRequestInfo(request); - CA_MEMORY_ALLOC_CHECK(requestInfo); + if(!response) + { + OIC_LOG(ERROR, TAG, "CACloneResponseInfo failed"); + OICFree(cadata); + return NULL; + } - // save data - data->type = request->isMulticast ? SEND_TYPE_MULTICAST : SEND_TYPE_UNICAST; - data->remoteEndpoint = remoteEndpoint; - data->requestInfo = requestInfo; - data->responseInfo = NULL; - data->options = NULL; - data->numOptions = 0; - if (NULL != requestInfo->info.options && 0 < requestInfo->info.numOptions) + cadata->type = SEND_TYPE_UNICAST; + info = &response->info; + cadata->responseInfo = response; + } + + if (NULL != info->options && 0 < info->numOptions) { - uint8_t numOptions = requestInfo->info.numOptions; + uint8_t numOptions = info->numOptions; // copy data CAHeaderOption_t *headerOption = (CAHeaderOption_t *) OICMalloc(sizeof(CAHeaderOption_t) * numOptions); - CA_MEMORY_ALLOC_CHECK(headerOption); + if(!headerOption) + { + OIC_LOG(ERROR, TAG, "memory allocation failed"); + CADestroyData(cadata, sizeof(CAData_t)); + return NULL; + } - memcpy(headerOption, requestInfo->info.options, sizeof(CAHeaderOption_t) * numOptions); + memcpy(headerOption, info->options, sizeof(CAHeaderOption_t) * numOptions); - data->options = headerOption; - data->numOptions = numOptions; + cadata->options = headerOption; + cadata->numOptions = numOptions; } - // add thread - CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); - OIC_LOG(DEBUG, TAG, "OUT"); - return CA_STATUS_OK; - -// memory error label. -memory_error_exit: - CAFreeEndpoint(remoteEndpoint); - CADestroyRequestInfoInternal(requestInfo); + CAEndpoint_t* ep = CACloneEndpoint(endpoint); + if (!ep) + { + OIC_LOG(ERROR, TAG, "endpoint clone failed"); + CADestroyData(cadata, sizeof(CAData_t)); + return NULL; + } - OICFree(data); - OIC_LOG(DEBUG, TAG, "OUT"); - return CA_MEMORY_ALLOC_FAILED; + cadata->remoteEndpoint = ep; + return cadata; } -CAResult_t CADetachResponseMessage(const CAEndpoint_t *object, - const CAResponseInfo_t *response) +CAResult_t CADetachRequestMessage(const CAEndpoint_t *object, const CARequestInfo_t *request) { OIC_LOG(DEBUG, TAG, "IN"); + VERIFY_NON_NULL(object, TAG, "object"); - VERIFY_NON_NULL(response, TAG, "response"); + VERIFY_NON_NULL(request, TAG, "request"); if (false == CAIsSelectedNetworkAvailable()) { return CA_STATUS_FAILED; } - CAEndpoint_t *remoteEndpoint = NULL; - CAResponseInfo_t *responseInfo = NULL; +#ifdef ARDUINO + // If max retransmission queue is reached, then don't handle new request + if (CA_MAX_RT_ARRAY_SIZE == u_arraylist_length(g_retransmissionContext.dataList)) + { + OIC_LOG(ERROR, TAG, "max RT queue size reached!"); + return CA_SEND_FAILED; + } +#endif /* ARDUINO */ - // allocate & initialize - CAData_t *data = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - CA_MEMORY_ALLOC_CHECK(data); + CAData_t *data = CAPrepareSendData(object, request, CA_REQUEST_DATA); + if(!data) + { + OIC_LOG(ERROR, TAG, "CAPrepareSendData failed"); + return CA_MEMORY_ALLOC_FAILED; + } - // clone remote endpoint - remoteEndpoint = CACloneEndpoint(object); - CA_MEMORY_ALLOC_CHECK(remoteEndpoint); +#ifdef SINGLE_THREAD + CAProcessSendData(data); + CADestroyData(data, sizeof(CAData_t)); +#else +#ifdef WITH_BWT + if (CA_ADAPTER_GATT_BTLE != object->adapter) + { + // send block data + CAResult_t res = CASendBlockWiseData(data); + if(CA_NOT_SUPPORTED == res) + { + OIC_LOG(DEBUG, TAG, "normal msg will be sent"); + CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); + return CA_STATUS_OK; + } + else + { + CADestroyData(data, sizeof(CAData_t)); + } + return res; + } + else +#endif + { + CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); + } +#endif - // clone response info - responseInfo = CACloneResponseInfo(response); - CA_MEMORY_ALLOC_CHECK(responseInfo); + OIC_LOG(DEBUG, TAG, "OUT"); + return CA_STATUS_OK; +} - // save data - data->type = SEND_TYPE_UNICAST; - data->remoteEndpoint = remoteEndpoint; - data->requestInfo = NULL; - data->responseInfo = responseInfo; - data->options = NULL; - data->numOptions = 0; - if (NULL != responseInfo->info.options && 0 < responseInfo->info.numOptions) - { - uint8_t numOptions = responseInfo->info.numOptions; - // copy data - CAHeaderOption_t *headerOption = (CAHeaderOption_t *) OICMalloc(sizeof(CAHeaderOption_t) - * numOptions); - CA_MEMORY_ALLOC_CHECK(headerOption); +CAResult_t CADetachResponseMessage(const CAEndpoint_t *object, + const CAResponseInfo_t *response) +{ + OIC_LOG(DEBUG, TAG, "IN"); + VERIFY_NON_NULL(object, TAG, "object"); + VERIFY_NON_NULL(response, TAG, "response"); - memcpy(headerOption, responseInfo->info.options, sizeof(CAHeaderOption_t) * numOptions); + if (false == CAIsSelectedNetworkAvailable()) + { + return CA_STATUS_FAILED; + } - data->options = headerOption; - data->numOptions = numOptions; + CAData_t *data = CAPrepareSendData(object, response, CA_RESPONSE_DATA); + if(!data) + { + OIC_LOG(ERROR, TAG, "CAPrepareSendData failed"); + return CA_MEMORY_ALLOC_FAILED; } - // add thread - CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); +#ifdef SINGLE_THREAD + CAProcessSendData(data); + CADestroyData(data, sizeof(CAData_t)); +#else +#ifdef WITH_BWT + if (CA_ADAPTER_GATT_BTLE != object->adapter) + { + // send block data + CAResult_t res = CASendBlockWiseData(data); + if(CA_NOT_SUPPORTED == res) + { + OIC_LOG(DEBUG, TAG, "normal msg will be sent"); + CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); + return CA_STATUS_OK; + } + else + { + CADestroyData(data, sizeof(CAData_t)); + } + return res; + } + else +#endif + { + CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); + } +#endif OIC_LOG(DEBUG, TAG, "OUT"); return CA_STATUS_OK; - -// memory error label. -memory_error_exit: - CAFreeEndpoint(remoteEndpoint); - CADestroyResponseInfoInternal(responseInfo); - OICFree(data); - OIC_LOG(DEBUG, TAG, "OUT"); - - return CA_MEMORY_ALLOC_FAILED; } CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t token, @@ -674,12 +930,12 @@ CAResult_t CADetachMessageResourceUri(const CAURI_t resourceUri, const CAToken_t } void CASetInterfaceCallbacks(CARequestCallback ReqHandler, CAResponseCallback RespHandler, - CAErrorCallback errroHandler) + CAErrorCallback errorHandler) { OIC_LOG(DEBUG, TAG, "IN"); g_requestHandler = ReqHandler; g_responseHandler = RespHandler; - g_errorHandler = errroHandler; + g_errorHandler = errorHandler; OIC_LOG(DEBUG, TAG, "OUT"); } @@ -691,10 +947,11 @@ CAResult_t CAInitializeMessageHandler() CASetNetworkChangeCallback(CANetworkChangedCallback); CASetErrorHandleCallback(CAErrorHandler); +#ifndef SINGLE_THREAD // create thread pool CAResult_t res = ca_thread_pool_init(MAX_THREAD_POOL_SIZE, &g_threadPoolHandle); - if (res != CA_STATUS_OK) + if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "thread pool initialize error."); return res; @@ -702,7 +959,7 @@ CAResult_t CAInitializeMessageHandler() // send thread initialize if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_sendThread, g_threadPoolHandle, - CASendThreadProcess, CADataDestroyer)) + CASendThreadProcess, CADestroyData)) { OIC_LOG(ERROR, TAG, "Failed to Initialize send queue thread"); return CA_STATUS_FAILED; @@ -711,7 +968,7 @@ CAResult_t CAInitializeMessageHandler() // start send thread res = CAQueueingThreadStart(&g_sendThread); - if (res != CA_STATUS_OK) + if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "thread start error(send thread)."); ca_thread_pool_free(g_threadPoolHandle); @@ -721,7 +978,7 @@ CAResult_t CAInitializeMessageHandler() // receive thread initialize if (CA_STATUS_OK != CAQueueingThreadInitialize(&g_receiveThread, g_threadPoolHandle, - CAReceiveThreadProcess, CADataDestroyer)) + CAReceiveThreadProcess, CADestroyData)) { OIC_LOG(ERROR, TAG, "Failed to Initialize receive queue thread"); return CA_STATUS_FAILED; @@ -736,16 +993,21 @@ CAResult_t CAInitializeMessageHandler() OIC_LOG(ERROR, TAG, "thread start error(receive thread)."); return res; } -#endif +#endif /* SINGLE_HANDLE */ // retransmission initialize CARetransmissionInitialize(&g_retransmissionContext, g_threadPoolHandle, CASendUnicastData, CATimeoutCallback, NULL); +#ifdef WITH_BWT + // block-wise transfer initialize + CAInitializeBlockWiseTransfer(CAAddDataToSendThread, CAAddDataToReceiveThread); +#endif + // start retransmission res = CARetransmissionStart(&g_retransmissionContext); - if (res != CA_STATUS_OK) + if (CA_STATUS_OK != res) { OIC_LOG(ERROR, TAG, "thread start error(retransmission thread)."); return res; @@ -753,6 +1015,13 @@ CAResult_t CAInitializeMessageHandler() // initialize interface adapters by controller CAInitializeAdapters(g_threadPoolHandle); +#else + // retransmission initialize + CARetransmissionInitialize(&g_retransmissionContext, NULL, CASendUnicastData, + CATimeoutCallback, NULL); + CAInitializeAdapters(); +#endif + OIC_LOG(DEBUG, TAG, "OUT"); return CA_STATUS_OK; } @@ -760,6 +1029,7 @@ CAResult_t CAInitializeMessageHandler() void CATerminateMessageHandler() { OIC_LOG(DEBUG, TAG, "IN"); +#ifndef SINGLE_THREAD CATransportAdapter_t connType; u_arraylist_t *list = CAGetSelectedNetworkList(); uint32_t length = u_arraylist_length(list); @@ -797,7 +1067,7 @@ void CATerminateMessageHandler() { #ifndef SINGLE_HANDLE // This will be enabled when RI supports multi threading CAQueueingThreadStop(&gReceiveThread); -#endif +#endif /* SINGLE_HANDLE */ } // destroy thread pool @@ -807,12 +1077,23 @@ void CATerminateMessageHandler() g_threadPoolHandle = NULL; } +#ifdef WITH_BWT + CATerminateBlockWiseTransfer(); +#endif CARetransmissionDestroy(&g_retransmissionContext); CAQueueingThreadDestroy(&g_sendThread); CAQueueingThreadDestroy(&g_receiveThread); // terminate interface adapters by controller CATerminateAdapters(); +#else + // terminate interface adapters by controller + CATerminateAdapters(); + + // stop retransmission + CARetransmissionStop(&g_retransmissionContext); + CARetransmissionDestroy(&g_retransmissionContext); +#endif OIC_LOG(DEBUG, TAG, "OUT"); } @@ -827,18 +1108,53 @@ void CALogPDUInfo(coap_pdu_t *pdu) OIC_LOG_V(DEBUG, TAG, "PDU Maker - code : %d", pdu->hdr->code); - OIC_LOG_V(DEBUG, TAG, "PDU Maker - id : %d", ntohs(pdu->hdr->id)); - OIC_LOG(DEBUG, TAG, "PDU Maker - token :"); OIC_LOG_BUFFER(DEBUG, TAG, pdu->hdr->token, pdu->hdr->token_length); } +static void CALogPayloadInfo(CAInfo_t *info) +{ + if(info) + { + if (!info->options) + { + for (uint32_t i = 0; i < info->numOptions; i++) + { + OIC_LOG_V(DEBUG, TAG, "optionID: %d", info->options[i].optionID); + + OIC_LOG_V(DEBUG, TAG, "list: %s", info->options[i].optionData); + } + } + + if (!info->payload) + { + OIC_LOG_V(DEBUG, TAG, "payload: %p(%u)", info->payload, + info->payloadSize); + } + + if (!info->token) + { + OIC_LOG(DEBUG, TAG, "token:"); + OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) info->token, + info->tokenLength); + } + OIC_LOG_V(DEBUG, TAG, "msgID: %d", info->messageId); + } + else + { + OIC_LOG(DEBUG, TAG, "info is NULL, cannot output log data"); + } +} + void CAErrorHandler(const CAEndpoint_t *endpoint, const void *data, uint32_t dataLen, CAResult_t result) { OIC_LOG(DEBUG, TAG, "IN"); + +#ifndef SINGLE_THREAD + VERIFY_NON_NULL_VOID(endpoint, TAG, "remoteEndpoint"); VERIFY_NON_NULL_VOID(data, TAG, "data"); @@ -852,65 +1168,20 @@ void CAErrorHandler(const CAEndpoint_t *endpoint, return; } - CAErrorInfo_t *errorInfo = (CAErrorInfo_t *)OICCalloc(1, sizeof (CAErrorInfo_t)); - if (NULL == errorInfo) + CAData_t *cadata = CAGenerateHandlerData(endpoint, pdu, CA_ERROR_DATA); + if(!cadata) { - OIC_LOG(ERROR, TAG, "CAErrorHandler, Memory allocation failed!"); + OIC_LOG(ERROR, TAG, "CAErrorHandler, CAGenerateHandlerData failed!"); coap_delete_pdu(pdu); return; } - CAResult_t res = CAGetErrorInfoFromPDU(pdu, errorInfo); - if (CA_STATUS_OK != res) - { - OIC_LOG_V(ERROR, TAG, "CAGetErrorInfoFromPDU failed : %d", res); - OICFree(errorInfo); - coap_delete_pdu(pdu); - return; - } - - errorInfo->result = result; - OIC_LOG_V(DEBUG, TAG, "error : %d", result); - if (NULL != errorInfo->info.payload) - { - OIC_LOG_V(DEBUG, TAG, "error, payload: %s", errorInfo->info.payload); - } - - OIC_LOG(DEBUG, TAG, "error, token"); - OIC_LOG_BUFFER(DEBUG, TAG, (const uint8_t *) errorInfo->info.token, - errorInfo->info.tokenLength); - OIC_LOG_V(DEBUG, TAG, "CAErrorHandler, msgID : %d", errorInfo->info.messageId); - - CAEndpoint_t *rep = NULL; - rep = CACloneEndpoint(endpoint); - if (!rep) - { - OIC_LOG(ERROR, TAG, "CAErrorHandler, CloneEndpoint Failed"); - OICFree(errorInfo); - coap_delete_pdu(pdu); - return; - } - - // store the data at queue. - CAData_t *cadata = NULL; - cadata = (CAData_t *) OICCalloc(1, sizeof(CAData_t)); - if (NULL == cadata) - { - OIC_LOG(ERROR, TAG, "CAReceivedPacketCallback, Memory allocation failed !"); - CAFreeEndpoint(rep); - OICFree(errorInfo); - coap_delete_pdu(pdu); - return; - } - - cadata->remoteEndpoint = rep; - cadata->requestInfo = NULL; - cadata->responseInfo = NULL; - cadata->errorInfo = errorInfo; - cadata->dataType = CA_ERROR_DATA; + cadata->errorInfo->result = result; CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); coap_delete_pdu(pdu); +#endif + OIC_LOG(DEBUG, TAG, "OUT"); return; }