BLE De-fragmentation changes and optimization for receiver handler
authorsandipan.p <sandipan.p@samsung.com>
Fri, 24 Jul 2015 04:11:52 +0000 (09:41 +0530)
committerErich Keane <erich.keane@intel.com>
Fri, 24 Jul 2015 16:01:30 +0000 (16:01 +0000)
Change-Id: If2217589eb7538018fa9e4e79ee55cd3bb52aed8
Signed-off-by: sandipan.p <sandipan.p@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/1410
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Abhishek Sharma <ce.abhishek@samsung.com>
Reviewed-by: Erich Keane <erich.keane@intel.com>
resource/csdk/connectivity/inc/caleadapter.h
resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c

index be13c16..0d2ff9b 100644 (file)
@@ -50,6 +50,18 @@ typedef struct
 } CALEData_t;
 
 /**
+ * Stores information of all the senders.
+ * This structure will be used to track and defragment all incoming data packets.
+ */
+typedef struct
+{
+    uint32_t recvDataLen;
+    uint32_t totalDataLen;
+    char *defragData;
+    CAEndpoint_t *remoteEndpoint;
+}CABLESenderInfo_t;
+
+/**
  * Initialize LE connectivity interface.
  * @param[in]  registerCallback Callback to register LE interfaces to
  *                               Connectivity Abstraction Layer.
@@ -262,28 +274,14 @@ void CALEServerSendDataThread(void *threadData);
 void CALEClientSendDataThread(void *threadData);
 
 /**
- * This function will be associated with the receiver queue of GattServer.
- * This function will defragment the data received and will send the data
- * UP to the CA layer only after it collects all the data from the adapter
- * layer. Adapter Header will provide the length of the data sent from the
- * server.
+ * This function will be associated with the receiver queue. This function will defragment
+ * the received data from each sender respectively and will send it up to CA layer.
+ * Respective sender's header will provide the length of the data sent.
  *
- * @param[in]  threadData Data pushed to the queue which contains the info
- * about RemoteEndpoint and Data.
+ * @param [IN]  threadData Data pushed to the queue which contains the info about RemoteEndpoint
+ * and Data.
  */
-void CALEServerDataReceiverHandler(void *threadData);
-
-/**
- * This function will be associated with the receiver queue of GattClient.
- * This function will defragment the data received and will send the data
- * UP to the CA layer only after it collects all the data from the adapter
- * layer. Adapter Header will provide the length of the data sent from the
- * server.
- *
- * @param[in]  threadData Data pushed to the queue which contains the info
- * about RemoteEndpoint and Data.
- */
-void CALEClientDataReceiverHandler(void *threadData);
+void CALEDataReceiverHandler(void *threadData);
 
 /**
  * This function is used to Initalize both GattServer and GattClient
@@ -355,29 +353,17 @@ CAResult_t CAInitLEServerSenderQueue();
 CAResult_t CAInitLEClientSenderQueue();
 
 /**
- * This function will initalize the Receiver queue for GattServer. This
- * will initialize the queue to process the function
- * CABLEServerDataReceiverHandler() when ever the task is added to this queue.
+ * This function will initalize the Receiver queue for LEAdapter. This will initialize
+ * the queue to process the function CABLEDataReceiverHandler() when ever the task
+ * is added to this queue.
  *
- * @return ::CA_STATUS_OK or Appropriate error code.
- * @retval ::CA_STATUS_OK  Successful.
- * @retval ::CA_STATUS_INVALID_PARAM  Invalid input argumets.
- * @retval ::CA_STATUS_FAILED Operation failed.
- *
- */
-CAResult_t CAInitLEServerReceiverQueue();
-
-/**
- * This function will initalize the Receiver queue for GattClient. This
- * will initialize the queue to process the function
- * CABLEClientDataReceiverHandler() when ever the task is added to this queue.
+ * @return ::CA_STATUS_OK or Appropriate error code
+ * @retval ::CA_STATUS_OK  Successful
+ * @retval ::CA_STATUS_INVALID_PARAM  Invalid input argumets
+ * @retval ::CA_STATUS_FAILED Operation failed
  *
- * @return ::CA_STATUS_OK or Appropriate error code.
- * @retval ::CA_STATUS_OK  Successful.
- * @retval ::CA_STATUS_INVALID_PARAM  Invalid input argumets.
- * @retval ::CA_STATUS_FAILED Operation failed.
  */
-CAResult_t CAInitLEClientReceiverQueue();
+CAResult_t CAInitLEReceiverQueue();
 
 /**
  * This function will create the Data required to send it in the queue.
index ebf1f10..92e51ff 100644 (file)
@@ -203,6 +203,11 @@ static void CALEErrorHandler(const char *remoteAddress, const void *data, uint32
 static bool g_dataReceiverHandlerState = false;
 
 /**
+ * Sender informations to be stored here
+ */
+static u_arraylist_t *g_senderInfo = NULL;
+
+/**
  * Queue to process the outgoing packets from GATTClient.
  */
 static CAQueueingThread_t *g_bleClientSendQueueHandle = NULL;
@@ -210,7 +215,7 @@ static CAQueueingThread_t *g_bleClientSendQueueHandle = NULL;
 /**
  * Queue to process the incoming packets to GATT Client.
  */
-static CAQueueingThread_t *g_bleClientReceiverQueue = NULL;
+static CAQueueingThread_t *g_bleReceiverQueue = NULL;
 
 /**
  * Queue to process the outgoing packets from GATTServer.
@@ -218,9 +223,9 @@ static CAQueueingThread_t *g_bleClientReceiverQueue = NULL;
 static CAQueueingThread_t *g_bleServerSendQueueHandle = NULL;
 
 /**
- * Queue to process the incoming packets to GATTServer.
+ * Mutex to synchronize the incoming data packets to receiver
  */
-static CAQueueingThread_t *g_bleServerReceiverQueue = NULL;
+static ca_mutex g_bleReceiveDataMutex = NULL;
 
 /**
 * Used to free data.
@@ -263,7 +268,7 @@ CAResult_t CAInitLEServerQueues()
         return CA_STATUS_FAILED;
     }
 
-    result = CAInitLEServerReceiverQueue();
+    result = CAInitLEReceiverQueue();
     if (CA_STATUS_OK != result)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "CAInitBleServerReceiverQueue failed");
@@ -293,7 +298,7 @@ CAResult_t CAInitLEClientQueues()
         return CA_STATUS_FAILED;
     }
 
-    result = CAInitLEClientReceiverQueue();
+    result = CAInitLEReceiverQueue();
     if (CA_STATUS_OK != result)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "CAInitBleClientReceiverQueue failed");
@@ -309,39 +314,49 @@ CAResult_t CAInitLEClientQueues()
     return CA_STATUS_OK;
 }
 
-CAResult_t CAInitLEServerSenderQueue()
+CAResult_t CAInitLEReceiverQueue()
 {
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN");
     // Check if the message queue is already initialized
-    if (g_bleServerSendQueueHandle)
+    if (g_bleReceiverQueue)
     {
-        OIC_LOG(DEBUG, CALEADAPTER_TAG, "Queue is already initialized!");
+        OIC_LOG(DEBUG, CALEADAPTER_TAG, "Already queue is initialized!");
         return CA_STATUS_OK;
     }
 
-    // Create send message queue
-    g_bleServerSendQueueHandle = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t));
-    if (!g_bleServerSendQueueHandle)
+    // Create recv message queue
+    g_bleReceiverQueue = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t));
+    if (!g_bleReceiverQueue)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!");
         return CA_MEMORY_ALLOC_FAILED;
     }
 
-    if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleServerSendQueueHandle,
-                                                   g_bleAdapterThreadPool,
-                                                   CALEServerSendDataThread, CALEDataDestroyer))
+    g_senderInfo = u_arraylist_create();
+    if (!g_senderInfo)
+    {
+        OIC_LOG(ERROR, CALEADAPTER_TAG, "ClientInfo memory allcation failed!");
+        OICFree(g_bleReceiverQueue);
+        g_bleReceiverQueue = NULL;
+        return CA_MEMORY_ALLOC_FAILED;
+    }
+
+    if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleReceiverQueue, g_bleAdapterThreadPool,
+            CALEDataReceiverHandler, CALEDataDestroyer))
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread");
-        OICFree(g_bleServerSendQueueHandle);
-        g_bleServerSendQueueHandle = NULL;
+        OICFree(g_bleReceiverQueue);
+        g_bleReceiverQueue = NULL;
+        u_arraylist_free(&g_senderInfo);
         return CA_STATUS_FAILED;
     }
 
-    if (CA_STATUS_OK != CAQueueingThreadStart(g_bleServerSendQueueHandle))
+    if (CA_STATUS_OK != CAQueueingThreadStart(g_bleReceiverQueue))
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_thread_pool_add_task failed ");
-        OICFree(g_bleServerSendQueueHandle);
-        g_bleServerSendQueueHandle = NULL;
+        OICFree(g_bleReceiverQueue);
+        g_bleReceiverQueue = NULL;
+        u_arraylist_free(&g_senderInfo);
         return CA_STATUS_FAILED;
     }
 
@@ -349,39 +364,39 @@ CAResult_t CAInitLEServerSenderQueue()
     return CA_STATUS_OK;
 }
 
-CAResult_t CAInitLEClientSenderQueue()
+CAResult_t CAInitLEServerSenderQueue()
 {
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN");
-
-    if (g_bleClientSendQueueHandle)
+    // Check if the message queue is already initialized
+    if (g_bleServerSendQueueHandle)
     {
-        OIC_LOG(DEBUG, CALEADAPTER_TAG, "Already queue is initialized!");
+        OIC_LOG(DEBUG, CALEADAPTER_TAG, "Queue is already initialized!");
         return CA_STATUS_OK;
     }
 
     // Create send message queue
-    g_bleClientSendQueueHandle = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t));
-    if (!g_bleClientSendQueueHandle)
+    g_bleServerSendQueueHandle = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t));
+    if (!g_bleServerSendQueueHandle)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!");
         return CA_MEMORY_ALLOC_FAILED;
     }
 
-    if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleClientSendQueueHandle,
+    if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleServerSendQueueHandle,
                                                    g_bleAdapterThreadPool,
-                                                   CALEClientSendDataThread, CALEDataDestroyer))
+                                                   CALEServerSendDataThread, CALEDataDestroyer))
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread");
-        OICFree(g_bleClientSendQueueHandle);
-        g_bleClientSendQueueHandle = NULL;
+        OICFree(g_bleServerSendQueueHandle);
+        g_bleServerSendQueueHandle = NULL;
         return CA_STATUS_FAILED;
     }
 
-    if (CA_STATUS_OK != CAQueueingThreadStart(g_bleClientSendQueueHandle))
+    if (CA_STATUS_OK != CAQueueingThreadStart(g_bleServerSendQueueHandle))
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_thread_pool_add_task failed ");
-        OICFree(g_bleClientSendQueueHandle);
-        g_bleClientSendQueueHandle = NULL;
+        OICFree(g_bleServerSendQueueHandle);
+        g_bleServerSendQueueHandle = NULL;
         return CA_STATUS_FAILED;
     }
 
@@ -389,82 +404,61 @@ CAResult_t CAInitLEClientSenderQueue()
     return CA_STATUS_OK;
 }
 
-CAResult_t CAInitLEServerReceiverQueue()
+void CALEClearSenderInfo()
 {
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN");
-    // Check if the message queue is already initialized
-    if (g_bleServerReceiverQueue)
-    {
-        OIC_LOG(DEBUG, CALEADAPTER_TAG, "Already queue is initialized!");
-        return CA_STATUS_OK;
-    }
 
-    // Create send message queue
-    g_bleServerReceiverQueue = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t));
-    if (!g_bleServerReceiverQueue)
+    uint32_t listIndex = 0;
+    uint32_t listLength = u_arraylist_length(g_senderInfo);
+    for (listIndex = 0; listIndex < listLength; listIndex++)
     {
-        OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!");
-        OICFree(g_bleServerSendQueueHandle);
-        return CA_MEMORY_ALLOC_FAILED;
-    }
-
-    if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleServerReceiverQueue, g_bleAdapterThreadPool,
-            CALEServerDataReceiverHandler, CALEDataDestroyer))
-    {
-        OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread");
-        OICFree(g_bleServerReceiverQueue);
-        g_bleServerReceiverQueue = NULL;
-        return CA_STATUS_FAILED;
-    }
+        CABLESenderInfo_t *info = (CABLESenderInfo_t *) u_arraylist_get(g_senderInfo, listIndex);
+        if(!info)
+        {
+            continue;
+        }
 
-    if (CA_STATUS_OK != CAQueueingThreadStart(g_bleServerReceiverQueue))
-    {
-        OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_thread_pool_add_task failed ");
-        OICFree(g_bleServerReceiverQueue);
-        g_bleServerReceiverQueue = NULL;
-        return CA_STATUS_FAILED;
+        OICFree(info->defragData);
+        CAFreeEndpoint(info->remoteEndpoint);
+        OICFree(info);
     }
-
+    u_arraylist_free(&g_senderInfo);
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT");
-    return CA_STATUS_OK;
 }
 
-CAResult_t CAInitLEClientReceiverQueue()
+CAResult_t CAInitLEClientSenderQueue()
 {
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN");
 
-    // Check if the message queue is already initialized
-    if (g_bleClientReceiverQueue)
+    if (g_bleClientSendQueueHandle)
     {
         OIC_LOG(DEBUG, CALEADAPTER_TAG, "Already queue is initialized!");
+        return CA_STATUS_OK;
     }
-    else
+
+    // Create send message queue
+    g_bleClientSendQueueHandle = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t));
+    if (!g_bleClientSendQueueHandle)
     {
-        // Create send message queue
-        g_bleClientReceiverQueue = (CAQueueingThread_t *) OICMalloc(sizeof(CAQueueingThread_t));
-        if (!g_bleClientReceiverQueue)
-        {
-            OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!");
-            OICFree(g_bleClientSendQueueHandle);
-            return CA_MEMORY_ALLOC_FAILED;
-        }
+        OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed!");
+        return CA_MEMORY_ALLOC_FAILED;
+    }
 
-        if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleClientReceiverQueue,
-                                                       g_bleAdapterThreadPool,
-                                                       CALEClientDataReceiverHandler, NULL))
-        {
-            OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread");
-            OICFree(g_bleClientSendQueueHandle);
-            OICFree(g_bleClientReceiverQueue);
-            g_bleClientReceiverQueue = NULL;
-            return CA_STATUS_FAILED;
-        }
+    if (CA_STATUS_OK != CAQueueingThreadInitialize(g_bleClientSendQueueHandle,
+                                                   g_bleAdapterThreadPool,
+                                                   CALEClientSendDataThread, CALEDataDestroyer))
+    {
+        OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to Initialize send queue thread");
+        OICFree(g_bleClientSendQueueHandle);
+        g_bleClientSendQueueHandle = NULL;
+        return CA_STATUS_FAILED;
     }
-    if (CA_STATUS_OK != CAQueueingThreadStart(g_bleClientReceiverQueue))
+
+    if (CA_STATUS_OK != CAQueueingThreadStart(g_bleClientSendQueueHandle))
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_thread_pool_add_task failed ");
-        OICFree(g_bleClientReceiverQueue);
-        g_bleClientReceiverQueue = NULL;
+        OICFree(g_bleClientSendQueueHandle);
+        g_bleClientSendQueueHandle = NULL;
         return CA_STATUS_FAILED;
     }
 
@@ -483,13 +477,6 @@ void CAStopLEQueues()
     }
     ca_mutex_unlock(g_bleClientSendDataMutex);
 
-    ca_mutex_lock(g_bleClientReceiveDataMutex);
-    if (NULL != g_bleClientReceiverQueue)
-    {
-        CAQueueingThreadStop(g_bleClientReceiverQueue);
-    }
-    ca_mutex_unlock(g_bleClientReceiveDataMutex);
-
     ca_mutex_lock(g_bleServerSendDataMutex);
     if (NULL != g_bleServerSendQueueHandle)
     {
@@ -497,12 +484,12 @@ void CAStopLEQueues()
     }
     ca_mutex_unlock(g_bleServerSendDataMutex);
 
-    ca_mutex_lock(g_bleServerReceiveDataMutex);
-    if (NULL != g_bleServerReceiverQueue)
+    ca_mutex_lock(g_bleReceiveDataMutex);
+    if (NULL != g_bleReceiverQueue)
     {
-        CAQueueingThreadStop(g_bleServerReceiverQueue);
+        CAQueueingThreadStop(g_bleReceiverQueue);
     }
-    ca_mutex_unlock(g_bleServerReceiveDataMutex);
+    ca_mutex_unlock(g_bleReceiveDataMutex);
 
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT");
 }
@@ -515,142 +502,54 @@ void CATerminateLEQueues()
     OICFree(g_bleClientSendQueueHandle);
     g_bleClientSendQueueHandle = NULL;
 
-
-    CAQueueingThreadDestroy(g_bleClientReceiverQueue);
-    OICFree(g_bleClientReceiverQueue);
-    g_bleClientReceiverQueue = NULL;
-
-
     CAQueueingThreadDestroy(g_bleServerSendQueueHandle);
     OICFree(g_bleServerSendQueueHandle);
     g_bleServerSendQueueHandle = NULL;
 
+    CAQueueingThreadDestroy(g_bleReceiverQueue);
+    OICFree(g_bleReceiverQueue);
+    g_bleReceiverQueue = NULL;
 
-    CAQueueingThreadDestroy(g_bleServerReceiverQueue);
-    OICFree(g_bleServerReceiverQueue);
-    g_bleServerReceiverQueue = NULL;
+    CALEClearSenderInfo();
 
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT");
 }
 
-void CALEServerDataReceiverHandler(void *threadData)
+CAResult_t CALEGetSenderInfo(char *leAddress, CABLESenderInfo_t **senderInfo,
+                                        uint32_t *senderIndex)
 {
-    OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN");
-
-    static uint32_t recvDataLen = 0;
-    static uint32_t totalDataLen = 0;
-    static char *defragData = NULL;
-    static bool isHeaderAvailable = false;
-    static CAEndpoint_t *remoteEndpoint = NULL;
-
-    ca_mutex_lock(g_bleServerReceiveDataMutex);
+    VERIFY_NON_NULL_RET(leAddress, CALEADAPTER_TAG, "Ble-Address in-param NULL", CA_STATUS_FAILED);
+    VERIFY_NON_NULL_RET(senderIndex, CALEADAPTER_TAG, "Index in-param NULL", CA_STATUS_FAILED);
 
-    if (g_dataReceiverHandlerState)
+    uint32_t listLength = u_arraylist_length(g_senderInfo);
+    uint32_t addrLength = strlen(leAddress);
+    for (uint32_t index = 0; index < listLength; index++)
     {
-        OIC_LOG(DEBUG, CALEADAPTER_TAG, "checking for DE Fragmentation");
-
-        CALEData_t *bleData = (CALEData_t *) threadData;
-        if (!bleData)
+        CABLESenderInfo_t *info = (CABLESenderInfo_t *) u_arraylist_get(g_senderInfo, index);
+        if(!info || !(info->remoteEndpoint))
         {
-            OIC_LOG(DEBUG, CALEADAPTER_TAG, "Invalid bleData!");
-            ca_mutex_unlock(g_bleServerReceiveDataMutex);
-            return;
+            continue;
         }
 
-        OIC_LOG(DEBUG, CALEADAPTER_TAG, "checking for DE Fragmentation");
-
-        if (!isHeaderAvailable)
+        if(!strncmp(info->remoteEndpoint->addr, leAddress, addrLength))
         {
-            OIC_LOG(DEBUG, CALEADAPTER_TAG, "Parsing the header");
-            totalDataLen = CAParseHeader((char*)bleData->data);
-
-            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Total data to be accumulated [%d] bytes", totalDataLen);
-            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "data received in the first packet [%d] bytes", bleData->dataLen);
-
-            defragData = (char *) OICCalloc(totalDataLen + 1, sizeof(char));
-            if (NULL == defragData)
+            *senderIndex = index;
+            if(senderInfo)
             {
-                OIC_LOG(ERROR, CALEADAPTER_TAG, "defragData is NULL!");
-                ca_mutex_unlock(g_bleServerReceiveDataMutex);
-                return;
+                *senderInfo = info;
             }
-
-            const char *remoteAddress = bleData->remoteEndpoint->addr;
-
-            remoteEndpoint = CACreateEndpointObject(CA_DEFAULT_FLAGS, CA_ADAPTER_GATT_BTLE,
-                                                    remoteAddress, 0);
-
-            memcpy(defragData + recvDataLen, bleData->data + CA_HEADER_LENGTH,
-                   bleData->dataLen - CA_HEADER_LENGTH);
-            recvDataLen += bleData->dataLen - CA_HEADER_LENGTH;
-            isHeaderAvailable = true;
-        }
-        else
-        {
-            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Copying the data of length [%d]", bleData->dataLen);
-            memcpy(defragData + recvDataLen, bleData->data, bleData->dataLen);
-            recvDataLen += bleData->dataLen ;
-            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "totalDatalength  [%d] recveived Datalen [%d]",
-                      totalDataLen, recvDataLen);
-        }
-        if (totalDataLen == recvDataLen)
-        {
-            ca_mutex_lock(g_bleAdapterReqRespCbMutex);
-            if (NULL == g_networkPacketReceivedCallback)
-            {
-                OIC_LOG(ERROR, CALEADAPTER_TAG, "gReqRespCallback is NULL!");
-                OICFree(defragData);
-                CAFreeEndpoint(remoteEndpoint);
-                remoteEndpoint = NULL;
-                defragData = NULL;
-                ca_mutex_unlock(g_bleAdapterReqRespCbMutex);
-                ca_mutex_unlock(g_bleServerReceiveDataMutex);
-                return;
-            }
-            OIC_LOG(DEBUG, CALEADAPTER_TAG, "Sending data up !");
-            g_networkPacketReceivedCallback(remoteEndpoint, defragData, recvDataLen);
-
-            OICFree(defragData);
-            CAFreeEndpoint(remoteEndpoint);
-
-            recvDataLen = 0;
-            totalDataLen = 0;
-            isHeaderAvailable = false;
-            remoteEndpoint = NULL;
-            defragData = NULL;
-            ca_mutex_unlock(g_bleAdapterReqRespCbMutex);
-        }
-
-        if (false == g_dataReceiverHandlerState)
-        {
-            OIC_LOG(DEBUG, CALEADAPTER_TAG, "GATTClient is terminating. Cleaning up");
-            recvDataLen = 0;
-            totalDataLen = 0;
-            isHeaderAvailable = false;
-            OICFree(defragData);
-            CAFreeEndpoint(remoteEndpoint);
-            remoteEndpoint = NULL;
-            defragData = NULL;
-            ca_mutex_unlock(g_bleServerReceiveDataMutex);
-            return;
+            return CA_STATUS_OK;
         }
     }
-    ca_mutex_unlock(g_bleServerReceiveDataMutex);
-    OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT");
+
+    return CA_STATUS_FAILED;
 }
 
-void CALEClientDataReceiverHandler(void *threadData)
+void CALEDataReceiverHandler(void *threadData)
 {
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "IN");
 
-    static const char *remoteAddress = NULL;
-    static uint32_t recvDataLen = 0;
-    static uint32_t totalDataLen = 0;
-    static char *defragData = NULL;
-    static bool isHeaderAvailable = false;
-    static CAEndpoint_t *remoteEndpoint = NULL;
-
-    ca_mutex_lock(g_bleClientReceiveDataMutex);
+    ca_mutex_lock(g_bleReceiveDataMutex);
 
     if (g_dataReceiverHandlerState)
     {
@@ -659,87 +558,141 @@ void CALEClientDataReceiverHandler(void *threadData)
         CALEData_t *bleData = (CALEData_t *) threadData;
         if (!bleData)
         {
-            OIC_LOG(DEBUG, CALEADAPTER_TAG, "Invalid wifidata!");
-            ca_mutex_unlock(g_bleClientReceiveDataMutex);
+            OIC_LOG(DEBUG, CALEADAPTER_TAG, "Invalid bleData!");
+            ca_mutex_unlock(g_bleReceiveDataMutex);
             return;
         }
 
-        OIC_LOG(DEBUG, CALEADAPTER_TAG, "checking for DE Fragmentation");
+        if(!(bleData->remoteEndpoint))
+        {
+            OIC_LOG(ERROR, CALEADAPTER_TAG, "Client RemoteEndPoint NULL!!");
+            ca_mutex_unlock(g_bleReceiveDataMutex);
+            return;
+        }
 
-        if (!isHeaderAvailable)
+        CABLESenderInfo_t *senderInfo = NULL;
+        uint32_t senderIndex = 0;
+
+        if(CA_STATUS_OK != CALEGetSenderInfo(bleData->remoteEndpoint->addr,
+                                                &senderInfo, &senderIndex))
         {
-            OIC_LOG(DEBUG, CALEADAPTER_TAG, "Parsing the header");
+            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "This is a new client [%s]",
+                                                bleData->remoteEndpoint->addr);
+        }
 
-            totalDataLen = CAParseHeader(bleData->data);
-            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Total data to be accumulated [%d] bytes",
-                      totalDataLen);
-            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Data received in the first packet [%d] bytes",
-                      bleData->dataLen);
+        if(!senderInfo)
+        {
+            CABLESenderInfo_t *newSender = (CABLESenderInfo_t*)OICMalloc(sizeof(CABLESenderInfo_t));
+            if(!newSender)
+            {
+                OIC_LOG(ERROR, CALEADAPTER_TAG, "Memory allocation failed for new sender");
+                ca_mutex_unlock(g_bleReceiveDataMutex);
+                return;
+            }
+            newSender->recvDataLen = 0;
+            newSender->totalDataLen = 0;
+            newSender->defragData = NULL;
+            newSender->remoteEndpoint = NULL;
 
-            defragData = (char *) OICMalloc(sizeof(char) * totalDataLen);
-            if (NULL == defragData)
+            OIC_LOG(DEBUG, CALEADAPTER_TAG, "Parsing the header");
+            newSender->totalDataLen = CAParseHeader((char*)bleData->data);
+            if(!(newSender->totalDataLen))
             {
-                OIC_LOG(ERROR, CALEADAPTER_TAG, "defragData is NULL!");
-                ca_mutex_unlock(g_bleClientReceiveDataMutex);
+                OIC_LOG(ERROR, CALEADAPTER_TAG, "Total Data Length is parsed as 0!!!");
+                OICFree(newSender);
+                ca_mutex_unlock(g_bleReceiveDataMutex);
                 return;
             }
 
-            remoteAddress = bleData->remoteEndpoint->addr;
+            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Total data to be accumulated [%u] bytes",
+                                                newSender->totalDataLen);
+            OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "data received in the first packet [%u] bytes",
+                                                bleData->dataLen);
 
-            remoteEndpoint = CACreateEndpointObject(CA_DEFAULT_FLAGS, CA_ADAPTER_GATT_BTLE,
-                                                    remoteAddress, 0);
+            newSender->defragData = (char *) OICCalloc(newSender->totalDataLen + 1, sizeof(char));
 
-            memcpy(defragData, bleData->data + CA_HEADER_LENGTH,
-                   bleData->dataLen - CA_HEADER_LENGTH);
-            recvDataLen += bleData->dataLen - CA_HEADER_LENGTH;
-            isHeaderAvailable = true;
+            if (NULL == newSender->defragData)
+            {
+                OIC_LOG(ERROR, CALEADAPTER_TAG, "defragData is NULL!");
+                OICFree(newSender);
+                ca_mutex_unlock(g_bleReceiveDataMutex);
+                return;
+            }
+
+            const char *remoteAddress = bleData->remoteEndpoint->addr;
+            newSender->remoteEndpoint = CACreateEndpointObject(CA_DEFAULT_FLAGS,
+                                                            CA_ADAPTER_GATT_BTLE, remoteAddress, 0);
+            if (NULL == newSender->remoteEndpoint)
+            {
+                OIC_LOG(ERROR, CALEADAPTER_TAG, "remoteEndpoint is NULL!");
+                OICFree(newSender->defragData);
+                OICFree(newSender);
+                ca_mutex_unlock(g_bleReceiveDataMutex);
+                return;
+            }
+            memcpy(newSender->defragData, bleData->data + CA_HEADER_LENGTH,
+                    bleData->dataLen - CA_HEADER_LENGTH);
+            newSender->recvDataLen += bleData->dataLen - CA_HEADER_LENGTH;
+            u_arraylist_add(g_senderInfo,(void *)newSender);
+
+            //Getting newSender index position in g_senderInfo array list
+            if(CA_STATUS_OK !=
+                CALEGetSenderInfo(newSender->remoteEndpoint->addr, NULL, &senderIndex))
+            {
+                OIC_LOG(ERROR, CALEADAPTER_TAG, "Existing sender index not found!!");
+                OICFree(senderInfo->defragData);
+                OICFree(senderInfo);
+                ca_mutex_unlock(g_bleReceiveDataMutex);
+                return;
+            }
+            senderInfo = newSender;
         }
         else
         {
+            if(senderInfo->recvDataLen + bleData->dataLen > senderInfo->totalDataLen)
+            {
+                OIC_LOG_V(ERROR, CALEADAPTER_TAG,
+                            "Data Length exceeding error!! Receiving [%d] total length [%d]",
+                            senderInfo->recvDataLen + bleData->dataLen, senderInfo->totalDataLen);
+                u_arraylist_remove(g_senderInfo, senderIndex);
+                OICFree(senderInfo->defragData);
+                OICFree(senderInfo);
+                ca_mutex_unlock(g_bleReceiveDataMutex);
+                return;
+            }
             OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Copying the data of length [%d]", bleData->dataLen);
-            memcpy(defragData + recvDataLen, bleData->data, bleData->dataLen);
-            recvDataLen += bleData->dataLen ;
+            memcpy(senderInfo->defragData + senderInfo->recvDataLen, bleData->data,
+                    bleData->dataLen);
+            senderInfo->recvDataLen += bleData->dataLen ;
             OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "totalDatalength  [%d] recveived Datalen [%d]",
-                      totalDataLen, recvDataLen);
+                                                senderInfo->totalDataLen, senderInfo->recvDataLen);
         }
-        if (totalDataLen == recvDataLen)
+
+        if (senderInfo->totalDataLen == senderInfo->recvDataLen)
         {
             ca_mutex_lock(g_bleAdapterReqRespCbMutex);
             if (NULL == g_networkPacketReceivedCallback)
             {
                 OIC_LOG(ERROR, CALEADAPTER_TAG, "gReqRespCallback is NULL!");
-                OICFree(defragData);
-                CAFreeEndpoint(remoteEndpoint);
-                remoteEndpoint = NULL;
-                defragData = NULL;
+
+                u_arraylist_remove(g_senderInfo, senderIndex);
+                OICFree(senderInfo->defragData);
+                OICFree(senderInfo);
                 ca_mutex_unlock(g_bleAdapterReqRespCbMutex);
-                ca_mutex_unlock(g_bleClientReceiveDataMutex);
+                ca_mutex_unlock(g_bleReceiveDataMutex);
                 return;
             }
             OIC_LOG(DEBUG, CALEADAPTER_TAG, "Sending data up !");
-            g_networkPacketReceivedCallback(remoteEndpoint, defragData, recvDataLen);
-            recvDataLen = 0;
-            totalDataLen = 0;
-            isHeaderAvailable = false;
-            OICFree(defragData);
-            CAFreeEndpoint(remoteEndpoint);
-            remoteEndpoint = NULL;
-            defragData = NULL;
+            g_networkPacketReceivedCallback(senderInfo->remoteEndpoint,
+                                                senderInfo->defragData, senderInfo->recvDataLen);
             ca_mutex_unlock(g_bleAdapterReqRespCbMutex);
-        }
-
-        if (false == g_dataReceiverHandlerState)
-        {
-            OIC_LOG(DEBUG, CALEADAPTER_TAG, "GATTClient is terminating. Cleaning up");
-            OICFree(defragData);
-            CAFreeEndpoint(remoteEndpoint);
-            remoteEndpoint = NULL;
-            defragData = NULL;
-            ca_mutex_unlock(g_bleClientReceiveDataMutex);
-            return;
+            u_arraylist_remove(g_senderInfo, senderIndex);
+            senderInfo->remoteEndpoint = NULL;
+            senderInfo->defragData = NULL;
+            OICFree(senderInfo);
         }
     }
-    ca_mutex_unlock(g_bleClientReceiveDataMutex);
+    ca_mutex_unlock(g_bleReceiveDataMutex);
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT");
 }
 
@@ -1218,6 +1171,16 @@ CAResult_t CAInitLEAdapterMutex()
         }
     }
 
+    if (NULL == g_bleReceiveDataMutex)
+    {
+        g_bleReceiveDataMutex = ca_mutex_new();
+        if (NULL == g_bleReceiveDataMutex)
+        {
+            OIC_LOG(ERROR, CALEADAPTER_TAG, "ca_mutex_new failed");
+            return CA_STATUS_FAILED;
+        }
+    }
+
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT");
     return CA_STATUS_OK;
 }
@@ -1253,6 +1216,9 @@ void CATerminateLEAdapterMutex()
     ca_mutex_free(g_bleAdapterReqRespCbMutex);
     g_bleAdapterReqRespCbMutex = NULL;
 
+    ca_mutex_free(g_bleReceiveDataMutex);
+    g_bleReceiveDataMutex = NULL;
+
     OIC_LOG(DEBUG, CALEADAPTER_TAG, "OUT");
 }
 
@@ -1834,7 +1800,7 @@ CAResult_t CALEAdapterServerReceivedData(const char *remoteAddress, const char *
         g_networkPacketReceivedCallback(&endPoint, data, dataLength);
     }
 #else
-    VERIFY_NON_NULL_RET(g_bleServerReceiverQueue, CALEADAPTER_TAG, "g_bleServerReceiverQueue",
+    VERIFY_NON_NULL_RET(g_bleReceiverQueue, CALEADAPTER_TAG, "g_bleReceiverQueue",
                         CA_STATUS_FAILED);
 
     //Add message to data queue
@@ -1860,7 +1826,7 @@ CAResult_t CALEAdapterServerReceivedData(const char *remoteAddress, const char *
 
     CAFreeEndpoint(remoteEndpoint);
     // Add message to send queue
-    CAQueueingThreadAddData(g_bleServerReceiverQueue, bleData, sizeof(CALEData_t));
+    CAQueueingThreadAddData(g_bleReceiverQueue, bleData, sizeof(CALEData_t));
 
     *sentLength = dataLength;
 #endif
@@ -1879,7 +1845,7 @@ CAResult_t CALEAdapterClientReceivedData(const char *remoteAddress, const char *
     VERIFY_NON_NULL(data, CALEADAPTER_TAG, "Data is null");
     VERIFY_NON_NULL(sentLength, CALEADAPTER_TAG, "Sent data length holder is null");
 #ifndef SINGLE_THREAD
-    VERIFY_NON_NULL_RET(g_bleClientReceiverQueue, CALEADAPTER_TAG, "g_bleClientReceiverQueue",
+    VERIFY_NON_NULL_RET(g_bleReceiverQueue, CALEADAPTER_TAG, "g_bleReceiverQueue",
                         CA_STATUS_FAILED);
 
     //Add message to data queue
@@ -1905,7 +1871,7 @@ CAResult_t CALEAdapterClientReceivedData(const char *remoteAddress, const char *
 
     CAFreeEndpoint(remoteEndpoint);
     // Add message to send queue
-    CAQueueingThreadAddData(g_bleClientReceiverQueue, bleData, sizeof(CALEData_t));
+    CAQueueingThreadAddData(g_bleReceiverQueue, bleData, sizeof(CALEData_t));
 
     *sentLength = dataLength;
 #endif