From 32070f719223472b512c3cc3eefd959ca3b27d1d Mon Sep 17 00:00:00 2001 From: "sandipan.p" Date: Fri, 24 Jul 2015 09:41:52 +0530 Subject: [PATCH] BLE De-fragmentation changes and optimization for receiver handler Change-Id: If2217589eb7538018fa9e4e79ee55cd3bb52aed8 Signed-off-by: sandipan.p Reviewed-on: https://gerrit.iotivity.org/gerrit/1410 Tested-by: jenkins-iotivity Reviewed-by: Abhishek Sharma Reviewed-by: Erich Keane --- resource/csdk/connectivity/inc/caleadapter.h | 66 ++- .../connectivity/src/bt_le_adapter/caleadapter.c | 494 ++++++++++----------- 2 files changed, 256 insertions(+), 304 deletions(-) diff --git a/resource/csdk/connectivity/inc/caleadapter.h b/resource/csdk/connectivity/inc/caleadapter.h index be13c16..0d2ff9b 100644 --- a/resource/csdk/connectivity/inc/caleadapter.h +++ b/resource/csdk/connectivity/inc/caleadapter.h @@ -50,6 +50,18 @@ typedef struct } CALEData_t; /** + * Stores information of all the senders. + * This structure will be used to track and defragment all incoming data packets. + */ +typedef struct +{ + uint32_t recvDataLen; + uint32_t totalDataLen; + char *defragData; + CAEndpoint_t *remoteEndpoint; +}CABLESenderInfo_t; + +/** * Initialize LE connectivity interface. * @param[in] registerCallback Callback to register LE interfaces to * Connectivity Abstraction Layer. @@ -262,28 +274,14 @@ void CALEServerSendDataThread(void *threadData); void CALEClientSendDataThread(void *threadData); /** - * This function will be associated with the receiver queue of GattServer. - * This function will defragment the data received and will send the data - * UP to the CA layer only after it collects all the data from the adapter - * layer. Adapter Header will provide the length of the data sent from the - * server. + * This function will be associated with the receiver queue. This function will defragment + * the received data from each sender respectively and will send it up to CA layer. + * Respective sender's header will provide the length of the data sent. * - * @param[in] threadData Data pushed to the queue which contains the info - * about RemoteEndpoint and Data. + * @param [IN] threadData Data pushed to the queue which contains the info about RemoteEndpoint + * and Data. */ -void CALEServerDataReceiverHandler(void *threadData); - -/** - * This function will be associated with the receiver queue of GattClient. - * This function will defragment the data received and will send the data - * UP to the CA layer only after it collects all the data from the adapter - * layer. Adapter Header will provide the length of the data sent from the - * server. - * - * @param[in] threadData Data pushed to the queue which contains the info - * about RemoteEndpoint and Data. - */ -void CALEClientDataReceiverHandler(void *threadData); +void CALEDataReceiverHandler(void *threadData); /** * This function is used to Initalize both GattServer and GattClient @@ -355,29 +353,17 @@ CAResult_t CAInitLEServerSenderQueue(); CAResult_t CAInitLEClientSenderQueue(); /** - * This function will initalize the Receiver queue for GattServer. This - * will initialize the queue to process the function - * CABLEServerDataReceiverHandler() when ever the task is added to this queue. + * This function will initalize the Receiver queue for LEAdapter. This will initialize + * the queue to process the function CABLEDataReceiverHandler() when ever the task + * is added to this queue. * - * @return ::CA_STATUS_OK or Appropriate error code. - * @retval ::CA_STATUS_OK Successful. - * @retval ::CA_STATUS_INVALID_PARAM Invalid input argumets. - * @retval ::CA_STATUS_FAILED Operation failed. - * - */ -CAResult_t CAInitLEServerReceiverQueue(); - -/** - * This function will initalize the Receiver queue for GattClient. This - * will initialize the queue to process the function - * CABLEClientDataReceiverHandler() when ever the task is added to this queue. + * @return ::CA_STATUS_OK or Appropriate error code + * @retval ::CA_STATUS_OK Successful + * @retval ::CA_STATUS_INVALID_PARAM Invalid input argumets + * @retval ::CA_STATUS_FAILED Operation failed * - * @return ::CA_STATUS_OK or Appropriate error code. - * @retval ::CA_STATUS_OK Successful. - * @retval ::CA_STATUS_INVALID_PARAM Invalid input argumets. - * @retval ::CA_STATUS_FAILED Operation failed. */ -CAResult_t CAInitLEClientReceiverQueue(); +CAResult_t CAInitLEReceiverQueue(); /** * This function will create the Data required to send it in the queue. diff --git a/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c b/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c index ebf1f10..92e51ff 100644 --- a/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c +++ b/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c @@ -203,6 +203,11 @@ static void CALEErrorHandler(const char *remoteAddress, const void *data, uint32 static bool g_dataReceiverHandlerState = false; /** + * Sender informations to be stored here + */ +static u_arraylist_t *g_senderInfo = NULL; + +/** * Queue to process the outgoing packets from GATTClient. */ static CAQueueingThread_t *g_bleClientSendQueueHandle = NULL; @@ -210,7 +215,7 @@ static CAQueueingThread_t *g_bleClientSendQueueHandle = NULL; /** * Queue to process the incoming packets to GATT Client. */ -static CAQueueingThread_t *g_bleClientReceiverQueue = NULL; +static CAQueueingThread_t *g_bleReceiverQueue = NULL; /** * Queue to process the outgoing packets from GATTServer. @@ -218,9 +223,9 @@ static CAQueueingThread_t *g_bleClientReceiverQueue = NULL; static CAQueueingThread_t *g_bleServerSendQueueHandle = NULL; /** - * Queue to process the incoming packets to GATTServer. + * Mutex to synchronize the incoming data packets to receiver */ -static CAQueueingThread_t *g_bleServerReceiverQueue = NULL; +static ca_mutex g_bleReceiveDataMutex = NULL; /** * Used to free data. @@ -263,7 +268,7 @@ CAResult_t CAInitLEServerQueues() return CA_STATUS_FAILED; } - result = CAInitLEServerReceiverQueue(); + result = CAInitLEReceiverQueue(); if (CA_STATUS_OK != result) { OIC_LOG(ERROR, CALEADAPTER_TAG, "CAInitBleServerReceiverQueue failed"); @@ -293,7 +298,7 @@ CAResult_t CAInitLEClientQueues() return CA_STATUS_FAILED; } - result = CAInitLEClientReceiverQueue(); + result = CAInitLEReceiverQueue(); if (CA_STATUS_OK != result) { OIC_LOG(ERROR, CALEADAPTER_TAG, "CAInitBleClientReceiverQueue failed"); @@ -309,39 +314,49 @@ CAResult_t CAInitLEClientQueues() return CA_STATUS_OK; } -CAResult_t CAInitLEServerSenderQueue() +CAResult_t CAInitLEReceiverQueue() { OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN"); // Check if the message queue is already initialized - if (g_bleServerSendQueueHandle) + if (g_bleReceiverQueue) { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "Queue is already initialized!"); + OIC_LOG(DEBUG, CALEADAPTER_TAG, "Already queue is initialized!"); return CA_STATUS_OK; } - // Create send message queue - g_bleServerSendQueueHandle = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t)); - if (!g_bleServerSendQueueHandle) + // Create recv message queue + g_bleReceiverQueue = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t)); + if (!g_bleReceiverQueue) { OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!"); return CA_MEMORY_ALLOC_FAILED; } - if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleServerSendQueueHandle, - g_bleAdapterThreadPool, - CALEServerSendDataThread, CALEDataDestroyer)) + g_senderInfo = u_arraylist_create(); + if (!g_senderInfo) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "ClientInfo memory allcation failed!"); + OICFree(g_bleReceiverQueue); + g_bleReceiverQueue = NULL; + return CA_MEMORY_ALLOC_FAILED; + } + + if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleReceiverQueue, g_bleAdapterThreadPool, + CALEDataReceiverHandler, CALEDataDestroyer)) { OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread"); - OICFree(g_bleServerSendQueueHandle); - g_bleServerSendQueueHandle = NULL; + OICFree(g_bleReceiverQueue); + g_bleReceiverQueue = NULL; + u_arraylist_free(&g_senderInfo); return CA_STATUS_FAILED; } - if (CA_STATUS_OK != CAQueueingThreadStart(g_bleServerSendQueueHandle)) + if (CA_STATUS_OK != CAQueueingThreadStart(g_bleReceiverQueue)) { OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_thread_pool_add_task failed "); - OICFree(g_bleServerSendQueueHandle); - g_bleServerSendQueueHandle = NULL; + OICFree(g_bleReceiverQueue); + g_bleReceiverQueue = NULL; + u_arraylist_free(&g_senderInfo); return CA_STATUS_FAILED; } @@ -349,39 +364,39 @@ CAResult_t CAInitLEServerSenderQueue() return CA_STATUS_OK; } -CAResult_t CAInitLEClientSenderQueue() +CAResult_t CAInitLEServerSenderQueue() { OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN"); - - if (g_bleClientSendQueueHandle) + // Check if the message queue is already initialized + if (g_bleServerSendQueueHandle) { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "Already queue is initialized!"); + OIC_LOG(DEBUG, CALEADAPTER_TAG, "Queue is already initialized!"); return CA_STATUS_OK; } // Create send message queue - g_bleClientSendQueueHandle = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t)); - if (!g_bleClientSendQueueHandle) + g_bleServerSendQueueHandle = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t)); + if (!g_bleServerSendQueueHandle) { OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!"); return CA_MEMORY_ALLOC_FAILED; } - if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleClientSendQueueHandle, + if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleServerSendQueueHandle, g_bleAdapterThreadPool, - CALEClientSendDataThread, CALEDataDestroyer)) + CALEServerSendDataThread, CALEDataDestroyer)) { OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread"); - OICFree(g_bleClientSendQueueHandle); - g_bleClientSendQueueHandle = NULL; + OICFree(g_bleServerSendQueueHandle); + g_bleServerSendQueueHandle = NULL; return CA_STATUS_FAILED; } - if (CA_STATUS_OK != CAQueueingThreadStart(g_bleClientSendQueueHandle)) + if (CA_STATUS_OK != CAQueueingThreadStart(g_bleServerSendQueueHandle)) { OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_thread_pool_add_task failed "); - OICFree(g_bleClientSendQueueHandle); - g_bleClientSendQueueHandle = NULL; + OICFree(g_bleServerSendQueueHandle); + g_bleServerSendQueueHandle = NULL; return CA_STATUS_FAILED; } @@ -389,82 +404,61 @@ CAResult_t CAInitLEClientSenderQueue() return CA_STATUS_OK; } -CAResult_t CAInitLEServerReceiverQueue() +void CALEClearSenderInfo() { OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN"); - // Check if the message queue is already initialized - if (g_bleServerReceiverQueue) - { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "Already queue is initialized!"); - return CA_STATUS_OK; - } - // Create send message queue - g_bleServerReceiverQueue = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t)); - if (!g_bleServerReceiverQueue) + uint32_t listIndex = 0; + uint32_t listLength = u_arraylist_length(g_senderInfo); + for (listIndex = 0; listIndex < listLength; listIndex++) { - OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!"); - OICFree(g_bleServerSendQueueHandle); - return CA_MEMORY_ALLOC_FAILED; - } - - if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleServerReceiverQueue, g_bleAdapterThreadPool, - CALEServerDataReceiverHandler, CALEDataDestroyer)) - { - OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread"); - OICFree(g_bleServerReceiverQueue); - g_bleServerReceiverQueue = NULL; - return CA_STATUS_FAILED; - } + CABLESenderInfo_t *info = (CABLESenderInfo_t *) u_arraylist_get(g_senderInfo, listIndex); + if(!info) + { + continue; + } - if (CA_STATUS_OK != CAQueueingThreadStart(g_bleServerReceiverQueue)) - { - OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_thread_pool_add_task failed "); - OICFree(g_bleServerReceiverQueue); - g_bleServerReceiverQueue = NULL; - return CA_STATUS_FAILED; + OICFree(info->defragData); + CAFreeEndpoint(info->remoteEndpoint); + OICFree(info); } - + u_arraylist_free(&g_senderInfo); OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT"); - return CA_STATUS_OK; } -CAResult_t CAInitLEClientReceiverQueue() +CAResult_t CAInitLEClientSenderQueue() { OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN"); - // Check if the message queue is already initialized - if (g_bleClientReceiverQueue) + if (g_bleClientSendQueueHandle) { OIC_LOG(DEBUG, CALEADAPTER_TAG, "Already queue is initialized!"); + return CA_STATUS_OK; } - else + + // Create send message queue + g_bleClientSendQueueHandle = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t)); + if (!g_bleClientSendQueueHandle) { - // Create send message queue - g_bleClientReceiverQueue = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t)); - if (!g_bleClientReceiverQueue) - { - OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!"); - OICFree(g_bleClientSendQueueHandle); - return CA_MEMORY_ALLOC_FAILED; - } + OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!"); + return CA_MEMORY_ALLOC_FAILED; + } - if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleClientReceiverQueue, - g_bleAdapterThreadPool, - CALEClientDataReceiverHandler, NULL)) - { - OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread"); - OICFree(g_bleClientSendQueueHandle); - OICFree(g_bleClientReceiverQueue); - g_bleClientReceiverQueue = NULL; - return CA_STATUS_FAILED; - } + if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleClientSendQueueHandle, + g_bleAdapterThreadPool, + CALEClientSendDataThread, CALEDataDestroyer)) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread"); + OICFree(g_bleClientSendQueueHandle); + g_bleClientSendQueueHandle = NULL; + return CA_STATUS_FAILED; } - if (CA_STATUS_OK != CAQueueingThreadStart(g_bleClientReceiverQueue)) + + if (CA_STATUS_OK != CAQueueingThreadStart(g_bleClientSendQueueHandle)) { OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_thread_pool_add_task failed "); - OICFree(g_bleClientReceiverQueue); - g_bleClientReceiverQueue = NULL; + OICFree(g_bleClientSendQueueHandle); + g_bleClientSendQueueHandle = NULL; return CA_STATUS_FAILED; } @@ -483,13 +477,6 @@ void CAStopLEQueues() } ca_mutex_unlock(g_bleClientSendDataMutex); - ca_mutex_lock(g_bleClientReceiveDataMutex); - if (NULL != g_bleClientReceiverQueue) - { - CAQueueingThreadStop(g_bleClientReceiverQueue); - } - ca_mutex_unlock(g_bleClientReceiveDataMutex); - ca_mutex_lock(g_bleServerSendDataMutex); if (NULL != g_bleServerSendQueueHandle) { @@ -497,12 +484,12 @@ void CAStopLEQueues() } ca_mutex_unlock(g_bleServerSendDataMutex); - ca_mutex_lock(g_bleServerReceiveDataMutex); - if (NULL != g_bleServerReceiverQueue) + ca_mutex_lock(g_bleReceiveDataMutex); + if (NULL != g_bleReceiverQueue) { - CAQueueingThreadStop(g_bleServerReceiverQueue); + CAQueueingThreadStop(g_bleReceiverQueue); } - ca_mutex_unlock(g_bleServerReceiveDataMutex); + ca_mutex_unlock(g_bleReceiveDataMutex); OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT"); } @@ -515,142 +502,54 @@ void CATerminateLEQueues() OICFree(g_bleClientSendQueueHandle); g_bleClientSendQueueHandle = NULL; - - CAQueueingThreadDestroy(g_bleClientReceiverQueue); - OICFree(g_bleClientReceiverQueue); - g_bleClientReceiverQueue = NULL; - - CAQueueingThreadDestroy(g_bleServerSendQueueHandle); OICFree(g_bleServerSendQueueHandle); g_bleServerSendQueueHandle = NULL; + CAQueueingThreadDestroy(g_bleReceiverQueue); + OICFree(g_bleReceiverQueue); + g_bleReceiverQueue = NULL; - CAQueueingThreadDestroy(g_bleServerReceiverQueue); - OICFree(g_bleServerReceiverQueue); - g_bleServerReceiverQueue = NULL; + CALEClearSenderInfo(); OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT"); } -void CALEServerDataReceiverHandler(void *threadData) +CAResult_t CALEGetSenderInfo(char *leAddress, CABLESenderInfo_t **senderInfo, + uint32_t *senderIndex) { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN"); - - static uint32_t recvDataLen = 0; - static uint32_t totalDataLen = 0; - static char *defragData = NULL; - static bool isHeaderAvailable = false; - static CAEndpoint_t *remoteEndpoint = NULL; - - ca_mutex_lock(g_bleServerReceiveDataMutex); + VERIFY_NON_NULL_RET(leAddress, CALEADAPTER_TAG, "Ble-Address in-param NULL", CA_STATUS_FAILED); + VERIFY_NON_NULL_RET(senderIndex, CALEADAPTER_TAG, "Index in-param NULL", CA_STATUS_FAILED); - if (g_dataReceiverHandlerState) + uint32_t listLength = u_arraylist_length(g_senderInfo); + uint32_t addrLength = strlen(leAddress); + for (uint32_t index = 0; index < listLength; index++) { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "checking for DE Fragmentation"); - - CALEData_t *bleData = (CALEData_t *) threadData; - if (!bleData) + CABLESenderInfo_t *info = (CABLESenderInfo_t *) u_arraylist_get(g_senderInfo, index); + if(!info || !(info->remoteEndpoint)) { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "Invalid bleData!"); - ca_mutex_unlock(g_bleServerReceiveDataMutex); - return; + continue; } - OIC_LOG(DEBUG, CALEADAPTER_TAG, "checking for DE Fragmentation"); - - if (!isHeaderAvailable) + if(!strncmp(info->remoteEndpoint->addr, leAddress, addrLength)) { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "Parsing the header"); - totalDataLen = CAParseHeader((char*)bleData->data); - - OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Total data to be accumulated [%d] bytes", totalDataLen); - OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "data received in the first packet [%d] bytes", bleData->dataLen); - - defragData = (char *) OICCalloc(totalDataLen + 1, sizeof(char)); - if (NULL == defragData) + *senderIndex = index; + if(senderInfo) { - OIC_LOG(ERROR, CALEADAPTER_TAG, "defragData is NULL!"); - ca_mutex_unlock(g_bleServerReceiveDataMutex); - return; + *senderInfo = info; } - - const char *remoteAddress = bleData->remoteEndpoint->addr; - - remoteEndpoint = CACreateEndpointObject(CA_DEFAULT_FLAGS, CA_ADAPTER_GATT_BTLE, - remoteAddress, 0); - - memcpy(defragData + recvDataLen, bleData->data + CA_HEADER_LENGTH, - bleData->dataLen - CA_HEADER_LENGTH); - recvDataLen += bleData->dataLen - CA_HEADER_LENGTH; - isHeaderAvailable = true; - } - else - { - OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Copying the data of length [%d]", bleData->dataLen); - memcpy(defragData + recvDataLen, bleData->data, bleData->dataLen); - recvDataLen += bleData->dataLen ; - OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "totalDatalength [%d] recveived Datalen [%d]", - totalDataLen, recvDataLen); - } - if (totalDataLen == recvDataLen) - { - ca_mutex_lock(g_bleAdapterReqRespCbMutex); - if (NULL == g_networkPacketReceivedCallback) - { - OIC_LOG(ERROR, CALEADAPTER_TAG, "gReqRespCallback is NULL!"); - OICFree(defragData); - CAFreeEndpoint(remoteEndpoint); - remoteEndpoint = NULL; - defragData = NULL; - ca_mutex_unlock(g_bleAdapterReqRespCbMutex); - ca_mutex_unlock(g_bleServerReceiveDataMutex); - return; - } - OIC_LOG(DEBUG, CALEADAPTER_TAG, "Sending data up !"); - g_networkPacketReceivedCallback(remoteEndpoint, defragData, recvDataLen); - - OICFree(defragData); - CAFreeEndpoint(remoteEndpoint); - - recvDataLen = 0; - totalDataLen = 0; - isHeaderAvailable = false; - remoteEndpoint = NULL; - defragData = NULL; - ca_mutex_unlock(g_bleAdapterReqRespCbMutex); - } - - if (false == g_dataReceiverHandlerState) - { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "GATTClient is terminating. Cleaning up"); - recvDataLen = 0; - totalDataLen = 0; - isHeaderAvailable = false; - OICFree(defragData); - CAFreeEndpoint(remoteEndpoint); - remoteEndpoint = NULL; - defragData = NULL; - ca_mutex_unlock(g_bleServerReceiveDataMutex); - return; + return CA_STATUS_OK; } } - ca_mutex_unlock(g_bleServerReceiveDataMutex); - OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT"); + + return CA_STATUS_FAILED; } -void CALEClientDataReceiverHandler(void *threadData) +void CALEDataReceiverHandler(void *threadData) { OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN"); - static const char *remoteAddress = NULL; - static uint32_t recvDataLen = 0; - static uint32_t totalDataLen = 0; - static char *defragData = NULL; - static bool isHeaderAvailable = false; - static CAEndpoint_t *remoteEndpoint = NULL; - - ca_mutex_lock(g_bleClientReceiveDataMutex); + ca_mutex_lock(g_bleReceiveDataMutex); if (g_dataReceiverHandlerState) { @@ -659,87 +558,141 @@ void CALEClientDataReceiverHandler(void *threadData) CALEData_t *bleData = (CALEData_t *) threadData; if (!bleData) { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "Invalid wifidata!"); - ca_mutex_unlock(g_bleClientReceiveDataMutex); + OIC_LOG(DEBUG, CALEADAPTER_TAG, "Invalid bleData!"); + ca_mutex_unlock(g_bleReceiveDataMutex); return; } - OIC_LOG(DEBUG, CALEADAPTER_TAG, "checking for DE Fragmentation"); + if(!(bleData->remoteEndpoint)) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "Client RemoteEndPoint NULL!!"); + ca_mutex_unlock(g_bleReceiveDataMutex); + return; + } - if (!isHeaderAvailable) + CABLESenderInfo_t *senderInfo = NULL; + uint32_t senderIndex = 0; + + if(CA_STATUS_OK != CALEGetSenderInfo(bleData->remoteEndpoint->addr, + &senderInfo, &senderIndex)) { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "Parsing the header"); + OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "This is a new client [%s]", + bleData->remoteEndpoint->addr); + } - totalDataLen = CAParseHeader(bleData->data); - OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Total data to be accumulated [%d] bytes", - totalDataLen); - OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Data received in the first packet [%d] bytes", - bleData->dataLen); + if(!senderInfo) + { + CABLESenderInfo_t *newSender = (CABLESenderInfo_t*)OICMalloc(sizeof(CABLESenderInfo_t)); + if(!newSender) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed for new sender"); + ca_mutex_unlock(g_bleReceiveDataMutex); + return; + } + newSender->recvDataLen = 0; + newSender->totalDataLen = 0; + newSender->defragData = NULL; + newSender->remoteEndpoint = NULL; - defragData = (char *) OICMalloc(sizeof(char) * totalDataLen); - if (NULL == defragData) + OIC_LOG(DEBUG, CALEADAPTER_TAG, "Parsing the header"); + newSender->totalDataLen = CAParseHeader((char*)bleData->data); + if(!(newSender->totalDataLen)) { - OIC_LOG(ERROR, CALEADAPTER_TAG, "defragData is NULL!"); - ca_mutex_unlock(g_bleClientReceiveDataMutex); + OIC_LOG(ERROR, CALEADAPTER_TAG, "Total Data Length is parsed as 0!!!"); + OICFree(newSender); + ca_mutex_unlock(g_bleReceiveDataMutex); return; } - remoteAddress = bleData->remoteEndpoint->addr; + OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Total data to be accumulated [%u] bytes", + newSender->totalDataLen); + OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "data received in the first packet [%u] bytes", + bleData->dataLen); - remoteEndpoint = CACreateEndpointObject(CA_DEFAULT_FLAGS, CA_ADAPTER_GATT_BTLE, - remoteAddress, 0); + newSender->defragData = (char *) OICCalloc(newSender->totalDataLen + 1, sizeof(char)); - memcpy(defragData, bleData->data + CA_HEADER_LENGTH, - bleData->dataLen - CA_HEADER_LENGTH); - recvDataLen += bleData->dataLen - CA_HEADER_LENGTH; - isHeaderAvailable = true; + if (NULL == newSender->defragData) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "defragData is NULL!"); + OICFree(newSender); + ca_mutex_unlock(g_bleReceiveDataMutex); + return; + } + + const char *remoteAddress = bleData->remoteEndpoint->addr; + newSender->remoteEndpoint = CACreateEndpointObject(CA_DEFAULT_FLAGS, + CA_ADAPTER_GATT_BTLE, remoteAddress, 0); + if (NULL == newSender->remoteEndpoint) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "remoteEndpoint is NULL!"); + OICFree(newSender->defragData); + OICFree(newSender); + ca_mutex_unlock(g_bleReceiveDataMutex); + return; + } + memcpy(newSender->defragData, bleData->data + CA_HEADER_LENGTH, + bleData->dataLen - CA_HEADER_LENGTH); + newSender->recvDataLen += bleData->dataLen - CA_HEADER_LENGTH; + u_arraylist_add(g_senderInfo,(void *)newSender); + + //Getting newSender index position in g_senderInfo array list + if(CA_STATUS_OK != + CALEGetSenderInfo(newSender->remoteEndpoint->addr, NULL, &senderIndex)) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "Existing sender index not found!!"); + OICFree(senderInfo->defragData); + OICFree(senderInfo); + ca_mutex_unlock(g_bleReceiveDataMutex); + return; + } + senderInfo = newSender; } else { + if(senderInfo->recvDataLen + bleData->dataLen > senderInfo->totalDataLen) + { + OIC_LOG_V(ERROR, CALEADAPTER_TAG, + "Data Length exceeding error!! Receiving [%d] total length [%d]", + senderInfo->recvDataLen + bleData->dataLen, senderInfo->totalDataLen); + u_arraylist_remove(g_senderInfo, senderIndex); + OICFree(senderInfo->defragData); + OICFree(senderInfo); + ca_mutex_unlock(g_bleReceiveDataMutex); + return; + } OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Copying the data of length [%d]", bleData->dataLen); - memcpy(defragData + recvDataLen, bleData->data, bleData->dataLen); - recvDataLen += bleData->dataLen ; + memcpy(senderInfo->defragData + senderInfo->recvDataLen, bleData->data, + bleData->dataLen); + senderInfo->recvDataLen += bleData->dataLen ; OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "totalDatalength [%d] recveived Datalen [%d]", - totalDataLen, recvDataLen); + senderInfo->totalDataLen, senderInfo->recvDataLen); } - if (totalDataLen == recvDataLen) + + if (senderInfo->totalDataLen == senderInfo->recvDataLen) { ca_mutex_lock(g_bleAdapterReqRespCbMutex); if (NULL == g_networkPacketReceivedCallback) { OIC_LOG(ERROR, CALEADAPTER_TAG, "gReqRespCallback is NULL!"); - OICFree(defragData); - CAFreeEndpoint(remoteEndpoint); - remoteEndpoint = NULL; - defragData = NULL; + + u_arraylist_remove(g_senderInfo, senderIndex); + OICFree(senderInfo->defragData); + OICFree(senderInfo); ca_mutex_unlock(g_bleAdapterReqRespCbMutex); - ca_mutex_unlock(g_bleClientReceiveDataMutex); + ca_mutex_unlock(g_bleReceiveDataMutex); return; } OIC_LOG(DEBUG, CALEADAPTER_TAG, "Sending data up !"); - g_networkPacketReceivedCallback(remoteEndpoint, defragData, recvDataLen); - recvDataLen = 0; - totalDataLen = 0; - isHeaderAvailable = false; - OICFree(defragData); - CAFreeEndpoint(remoteEndpoint); - remoteEndpoint = NULL; - defragData = NULL; + g_networkPacketReceivedCallback(senderInfo->remoteEndpoint, + senderInfo->defragData, senderInfo->recvDataLen); ca_mutex_unlock(g_bleAdapterReqRespCbMutex); - } - - if (false == g_dataReceiverHandlerState) - { - OIC_LOG(DEBUG, CALEADAPTER_TAG, "GATTClient is terminating. Cleaning up"); - OICFree(defragData); - CAFreeEndpoint(remoteEndpoint); - remoteEndpoint = NULL; - defragData = NULL; - ca_mutex_unlock(g_bleClientReceiveDataMutex); - return; + u_arraylist_remove(g_senderInfo, senderIndex); + senderInfo->remoteEndpoint = NULL; + senderInfo->defragData = NULL; + OICFree(senderInfo); } } - ca_mutex_unlock(g_bleClientReceiveDataMutex); + ca_mutex_unlock(g_bleReceiveDataMutex); OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT"); } @@ -1218,6 +1171,16 @@ CAResult_t CAInitLEAdapterMutex() } } + if (NULL == g_bleReceiveDataMutex) + { + g_bleReceiveDataMutex = ca_mutex_new(); + if (NULL == g_bleReceiveDataMutex) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_mutex_new failed"); + return CA_STATUS_FAILED; + } + } + OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT"); return CA_STATUS_OK; } @@ -1253,6 +1216,9 @@ void CATerminateLEAdapterMutex() ca_mutex_free(g_bleAdapterReqRespCbMutex); g_bleAdapterReqRespCbMutex = NULL; + ca_mutex_free(g_bleReceiveDataMutex); + g_bleReceiveDataMutex = NULL; + OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT"); } @@ -1834,7 +1800,7 @@ CAResult_t CALEAdapterServerReceivedData(const char *remoteAddress, const char * g_networkPacketReceivedCallback(&endPoint, data, dataLength); } #else - VERIFY_NON_NULL_RET(g_bleServerReceiverQueue, CALEADAPTER_TAG, "g_bleServerReceiverQueue", + VERIFY_NON_NULL_RET(g_bleReceiverQueue, CALEADAPTER_TAG, "g_bleReceiverQueue", CA_STATUS_FAILED); //Add message to data queue @@ -1860,7 +1826,7 @@ CAResult_t CALEAdapterServerReceivedData(const char *remoteAddress, const char * CAFreeEndpoint(remoteEndpoint); // Add message to send queue - CAQueueingThreadAddData(g_bleServerReceiverQueue, bleData, sizeof(CALEData_t)); + CAQueueingThreadAddData(g_bleReceiverQueue, bleData, sizeof(CALEData_t)); *sentLength = dataLength; #endif @@ -1879,7 +1845,7 @@ CAResult_t CALEAdapterClientReceivedData(const char *remoteAddress, const char * VERIFY_NON_NULL(data, CALEADAPTER_TAG, "Data is null"); VERIFY_NON_NULL(sentLength, CALEADAPTER_TAG, "Sent data length holder is null"); #ifndef SINGLE_THREAD - VERIFY_NON_NULL_RET(g_bleClientReceiverQueue, CALEADAPTER_TAG, "g_bleClientReceiverQueue", + VERIFY_NON_NULL_RET(g_bleReceiverQueue, CALEADAPTER_TAG, "g_bleReceiverQueue", CA_STATUS_FAILED); //Add message to data queue @@ -1905,7 +1871,7 @@ CAResult_t CALEAdapterClientReceivedData(const char *remoteAddress, const char * CAFreeEndpoint(remoteEndpoint); // Add message to send queue - CAQueueingThreadAddData(g_bleClientReceiverQueue, bleData, sizeof(CALEData_t)); + CAQueueingThreadAddData(g_bleReceiverQueue, bleData, sizeof(CALEData_t)); *sentLength = dataLength; #endif -- 2.7.4