wrap CAcloseSslConnectionFreeEndpoint method with __TIZEN__ flag
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / bt_le_adapter / caleadapter.c
old mode 100755 (executable)
new mode 100644 (file)
index 5f4904c..01c80b2
@@ -59,7 +59,7 @@ typedef struct
     uint32_t totalDataLen;
     uint8_t *defragData;
     CAEndpoint_t *remoteEndpoint;
- } CABLESenderInfo_t;
+} CABLESenderInfo_t;
 
 typedef enum
 {
@@ -111,12 +111,6 @@ static CADataType_t g_dataType = CA_REQUEST_DATA;
 static oc_mutex g_bleIsServerMutex = NULL;
 
 /**
- * Mutex to synchronize the callback to be called for the network
- * changes.
- */
-static oc_mutex g_bleNetworkCbMutex = NULL;
-
-/**
  * Mutex to synchronize the updates of the local LE address of the
  * adapter.
  */
@@ -128,35 +122,15 @@ static oc_mutex g_bleLocalAddressMutex = NULL;
 static ca_thread_pool_t g_bleAdapterThreadPool = NULL;
 
 /**
- * Mutex to synchronize the task to be pushed to thread pool.
- */
-static oc_mutex g_bleAdapterThreadPoolMutex = NULL;
-
-/**
- * Mutex to synchronize the queing of the data from SenderQueue.
- */
-static oc_mutex g_bleClientSendDataMutex = NULL;
-
-/**
  * Mutex to synchronize the queing of the data from ReceiverQueue.
  */
 static oc_mutex g_bleClientReceiveDataMutex = NULL;
 
 /**
- * Mutex to synchronize the queing of the data from SenderQueue.
- */
-static oc_mutex g_bleServerSendDataMutex = NULL;
-
-/**
  * Mutex to synchronize the queing of the data from ReceiverQueue.
  */
 static oc_mutex g_bleServerReceiveDataMutex = NULL;
 
-/**
- * Mutex to synchronize the callback to be called for the
- * adapterReqResponse.
- */
-static oc_mutex g_bleAdapterReqRespCbMutex = NULL;
 
 /**
  * Callback to be called when network packet received from either
@@ -523,7 +497,6 @@ static void CALEDataDestroyer(void *data, uint32_t size);
  * @param[in] address        target address to remove data in queue.
  */
 static void CALERemoveSendQueueData(CAQueueingThread_t *queueHandle,
-                                    oc_mutex mutex,
                                     const char* address);
 
 /**
@@ -540,7 +513,7 @@ static void CALERemoveReceiveQueueData(u_arraylist_t *dataInfoList,
  * for client / server which is matched same leAddress and port.
  *
  * @param[in]  leAddress       target address to get serderInfo.
- * @param[in]  port            target port to get serderInfo.
+ * @param[in]  port            target port to get senderInfo.
  * @param[in]  senderInfoList  received data list for client / server.
  * @param[out] senderInfo      Pointer to contain matched(leAddress and port)
  *                             received data info.
@@ -569,13 +542,12 @@ static CAResult_t CALEGetPortsFromSenderInfo(const char *leAddress,
 
 static CAResult_t CAInitLEServerQueues()
 {
-    oc_mutex_lock(g_bleAdapterThreadPoolMutex);
-
+    
     CAResult_t result = CAInitLEServerSenderQueue();
     if (CA_STATUS_OK != result)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "CAInitBleServerSenderQueue failed");
-        oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
+        
         return CA_STATUS_FAILED;
     }
 
@@ -583,7 +555,6 @@ static CAResult_t CAInitLEServerQueues()
     if (!g_bleServerSenderInfo)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "memory allocation failed!");
-        oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
         return CA_MEMORY_ALLOC_FAILED;
     }
 
@@ -592,25 +563,19 @@ static CAResult_t CAInitLEServerQueues()
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "CAInitLEServerReceiverQueue failed");
         u_arraylist_free(&g_bleServerSenderInfo);
-        oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
         return CA_STATUS_FAILED;
     }
 
     g_dataBleServerReceiverHandlerState = true;
-
-    oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
     return CA_STATUS_OK;
 }
 
 static CAResult_t CAInitLEClientQueues()
 {
-    oc_mutex_lock(g_bleAdapterThreadPoolMutex);
-
     CAResult_t result = CAInitLEClientSenderQueue();
     if (CA_STATUS_OK != result)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "CAInitBleClientSenderQueue failed");
-        oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
         return CA_STATUS_FAILED;
     }
 
@@ -618,7 +583,6 @@ static CAResult_t CAInitLEClientQueues()
     if (!g_bleClientSenderInfo)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "memory allocation failed!");
-        oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
         return CA_MEMORY_ALLOC_FAILED;
     }
 
@@ -627,13 +591,11 @@ static CAResult_t CAInitLEClientQueues()
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "CAInitLEClientReceiverQueue failed");
         u_arraylist_free(&g_bleClientSenderInfo);
-        oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
         return CA_STATUS_FAILED;
     }
 
     g_dataBleClientReceiverHandlerState = true;
 
-    oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
     return CA_STATUS_OK;
 }
 
@@ -749,7 +711,7 @@ static CAResult_t CAInitLEServerSenderQueue()
     return CA_STATUS_OK;
 }
 
-static void CALEClearSenderInfoImpl(u_arraylist_t ** list)
+static void CALEClearSenderInfoImpl(u_arraylist_t **list)
 {
     const size_t length = u_arraylist_length(*list);
     for (size_t i = 0; i < length; ++i)
@@ -1117,7 +1079,6 @@ static void CALEDataReceiverHandler(void *threadData, CABLEAdapter_t receiverTyp
 
         if (senderInfo->totalDataLen == senderInfo->recvDataLen)
         {
-            oc_mutex_lock(g_bleAdapterReqRespCbMutex);
             if (NULL == g_networkPacketReceivedCallback)
             {
                 OIC_LOG(ERROR, CALEADAPTER_TAG, "gReqRespCallback is NULL!");
@@ -1125,7 +1086,6 @@ static void CALEDataReceiverHandler(void *threadData, CABLEAdapter_t receiverTyp
                 u_arraylist_remove(bleData->senderInfo, senderIndex);
                 OICFree(senderInfo->defragData);
                 OICFree(senderInfo);
-                oc_mutex_unlock(g_bleAdapterReqRespCbMutex);
                 oc_mutex_unlock(bleReceiveDataMutex);
                 return;
             }
@@ -1152,7 +1112,6 @@ static void CALEDataReceiverHandler(void *threadData, CABLEAdapter_t receiverTyp
                         break;
                     default:
                         OIC_LOG_V(ERROR, CALEADAPTER_TAG, "Unsupported rcvr type:%d",receiverType);
-                        oc_mutex_unlock(g_bleAdapterReqRespCbMutex);
                         u_arraylist_remove(bleData->senderInfo, senderIndex);
                         senderInfo->remoteEndpoint = NULL;
                         senderInfo->defragData = NULL;
@@ -1184,7 +1143,6 @@ static void CALEDataReceiverHandler(void *threadData, CABLEAdapter_t receiverTyp
             }
 #endif
 
-            oc_mutex_unlock(g_bleAdapterReqRespCbMutex);
             u_arraylist_remove(bleData->senderInfo, senderIndex);
             senderInfo->remoteEndpoint = NULL;
             senderInfo->defragData = NULL;
@@ -1315,7 +1273,7 @@ static void CALEServerSendDataThread(void *threadData)
     uint32_t dataLen = 0;
     if (g_mtuSize > totalLength)
     {
-        length = totalLength;
+        length = (uint32_t)totalLength;
         dataLen = bleData->dataLen;
     }
     else
@@ -1368,7 +1326,7 @@ static void CALEServerSendDataThread(void *threadData)
 
         OIC_LOG_V(DEBUG,
                   CALEADAPTER_TAG,
-                  "Server Sent Unicast First Data - data length [%zu]",
+                  "Server Sent Unicast First Data - data length [%u]",
                   length);
 
         result = CAGenerateHeader(dataHeader,
@@ -1505,7 +1463,7 @@ static void CALEClientSendDataThread(void *threadData)
         return;
     }
 
-#if defined(__TIZEN__) || defined(__ANDROID__)
+#if defined(__ANDROID__)
     // get MTU size
     if (false == CALEClientIsConnected(bleData->remoteEndpoint->addr))
     {
@@ -1529,6 +1487,8 @@ static void CALEClientSendDataThread(void *threadData)
             return;
         }
     }
+#endif
+#if defined(__TIZEN__) || defined(__ANDROID__)
     g_mtuSize = CALEClientGetMtuSize(bleData->remoteEndpoint->addr);
 #endif
     OIC_LOG_V(INFO, CALEADAPTER_TAG, "MTU size [%d]", g_mtuSize);
@@ -1616,7 +1576,7 @@ static void CALEClientSendDataThread(void *threadData)
     uint32_t dataLen = 0;
     if (g_mtuSize > totalLength)
     {
-        length = totalLength;
+        length = (uint32_t)totalLength;
         dataLen = bleData->dataLen;
     }
     else
@@ -1671,7 +1631,7 @@ static void CALEClientSendDataThread(void *threadData)
         }
         OIC_LOG_V(DEBUG,
                   CALEADAPTER_TAG,
-                  "Client Sent Unicast First Data - data length [%zu]",
+                  "Client Sent Unicast First Data - data length [%u]",
                   length);
 
         result = CAGenerateHeader(dataHeader,
@@ -1733,7 +1693,7 @@ static void CALEClientSendDataThread(void *threadData)
             }
             OIC_LOG_V(DEBUG,
                       CALEADAPTER_TAG,
-                      "Client Sent Unicast %d Data - data(mtu) length [%zu]",
+                      "Client Sent Unicast %d Data - data(mtu) length [%hu]",
                       index + 1,
                       g_mtuSize);
         }
@@ -2161,17 +2121,6 @@ static CAResult_t CAInitLEAdapterMutex()
         }
     }
 
-    if (NULL == g_bleNetworkCbMutex)
-    {
-        g_bleNetworkCbMutex = oc_mutex_new();
-        if (NULL == g_bleNetworkCbMutex)
-        {
-            OIC_LOG(ERROR, CALEADAPTER_TAG, "oc_mutex_new failed");
-            CATerminateLEAdapterMutex();
-            return CA_STATUS_FAILED;
-        }
-    }
-
     if (NULL == g_bleLocalAddressMutex)
     {
         g_bleLocalAddressMutex = oc_mutex_new();
@@ -2183,49 +2132,7 @@ static CAResult_t CAInitLEAdapterMutex()
         }
     }
 
-    if (NULL == g_bleAdapterThreadPoolMutex)
-    {
-        g_bleAdapterThreadPoolMutex = oc_mutex_new();
-        if (NULL == g_bleAdapterThreadPoolMutex)
-        {
-            OIC_LOG(ERROR, CALEADAPTER_TAG, "oc_mutex_new failed");
-            CATerminateLEAdapterMutex();
-            return CA_STATUS_FAILED;
-        }
-    }
-
-    if (NULL == g_bleClientSendDataMutex)
-    {
-        g_bleClientSendDataMutex = oc_mutex_new();
-        if (NULL == g_bleClientSendDataMutex)
-        {
-            OIC_LOG(ERROR, CALEADAPTER_TAG, "oc_mutex_new failed");
-            CATerminateLEAdapterMutex();
-            return CA_STATUS_FAILED;
-        }
-    }
-
-    if (NULL == g_bleServerSendDataMutex)
-    {
-        g_bleServerSendDataMutex = oc_mutex_new();
-        if (NULL == g_bleServerSendDataMutex)
-        {
-            OIC_LOG(ERROR, CALEADAPTER_TAG, "oc_mutex_new failed");
-            CATerminateLEAdapterMutex();
-            return CA_STATUS_FAILED;
-        }
-    }
-
-    if (NULL == g_bleAdapterReqRespCbMutex)
-    {
-        g_bleAdapterReqRespCbMutex = oc_mutex_new();
-        if (NULL == g_bleAdapterReqRespCbMutex)
-        {
-            OIC_LOG(ERROR, CALEADAPTER_TAG, "oc_mutex_new failed");
-            CATerminateLEAdapterMutex();
-            return CA_STATUS_FAILED;
-        }
-    }
+    
 
     if (NULL == g_bleServerReceiveDataMutex)
     {
@@ -2257,23 +2164,11 @@ static void CATerminateLEAdapterMutex()
     oc_mutex_free(g_bleIsServerMutex);
     g_bleIsServerMutex = NULL;
 
-    oc_mutex_free(g_bleNetworkCbMutex);
-    g_bleNetworkCbMutex = NULL;
-
+    
     oc_mutex_free(g_bleLocalAddressMutex);
     g_bleLocalAddressMutex = NULL;
 
-    oc_mutex_free(g_bleAdapterThreadPoolMutex);
-    g_bleAdapterThreadPoolMutex = NULL;
-
-    oc_mutex_free(g_bleClientSendDataMutex);
-    g_bleClientSendDataMutex = NULL;
-
-    oc_mutex_free(g_bleServerSendDataMutex);
-    g_bleServerSendDataMutex = NULL;
-
-    oc_mutex_free(g_bleAdapterReqRespCbMutex);
-    g_bleAdapterReqRespCbMutex = NULL;
+    
 
     oc_mutex_free(g_bleServerReceiveDataMutex);
     g_bleServerReceiveDataMutex = NULL;
@@ -2484,6 +2379,7 @@ static CAResult_t CALEAdapterClientSendData(const CAEndpoint_t *remoteEndpoint,
 static CAResult_t CALEAdapterGattServerStart()
 {
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "%s", __func__);
+    CAResult_t result = CA_STATUS_FAILED;
 
     if (caglobals.bleFlags & CA_LE_SERVER_DISABLE)
     {
@@ -2492,7 +2388,10 @@ static CAResult_t CALEAdapterGattServerStart()
         return CA_STATUS_OK;
     }
 
-    CAResult_t result = CAStartLEGattServer();
+#ifndef DISABLE_BLE_SERVER
+    OIC_LOG_V(INFO, CALEADAPTER_TAG, "Starting LE GATT Server");
+    result = CAStartLEGattServer();
+#endif
 
 #ifndef SINGLE_THREAD
     /*
@@ -2501,10 +2400,7 @@ static CAResult_t CALEAdapterGattServerStart()
     */
     if (CA_STATUS_OK == result)
     {
-        oc_mutex_lock(g_bleServerSendDataMutex);
         result = CAQueueingThreadStart(g_bleServerSendQueueHandle);
-        oc_mutex_unlock(g_bleServerSendDataMutex);
-
         if (CA_STATUS_OK != result)
         {
             OIC_LOG_V(ERROR,
@@ -2531,14 +2427,11 @@ static CAResult_t CALEAdapterGattServerStop()
 
 #ifndef SINGLE_THREAD
 
-    oc_mutex_lock(g_bleServerSendDataMutex);
     CAResult_t res = CAQueueingThreadStop(g_bleServerSendQueueHandle);
     if (CA_STATUS_OK != res)
     {
         OIC_LOG(ERROR, CALEADAPTER_TAG, "CAQueueingThreadStop has failed");
     }
-    oc_mutex_unlock(g_bleServerSendDataMutex);
-
     res = CAStopLEGattServer();
     if (CA_STATUS_OK != res)
     {
@@ -2564,10 +2457,7 @@ static CAResult_t CALEAdapterGattClientStart()
     */
     if (CA_STATUS_OK == result)
     {
-        oc_mutex_lock(g_bleClientSendDataMutex);
         result = CAQueueingThreadStart(g_bleClientSendQueueHandle);
-        oc_mutex_unlock(g_bleClientSendDataMutex);
-
         if (CA_STATUS_OK != result)
         {
             OIC_LOG(ERROR,
@@ -2586,10 +2476,8 @@ static CAResult_t CALEAdapterGattClientStop()
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "%s", __func__);
     CAStopLEGattClient();
 
-    oc_mutex_lock(g_bleClientSendDataMutex);
     CAResult_t result = CAQueueingThreadStop(g_bleClientSendQueueHandle);
-    oc_mutex_unlock(g_bleClientSendDataMutex);
-
+    
     return result;
 #else
     CAStopLEGattClient();
@@ -2604,7 +2492,7 @@ static ssize_t CALESecureSendDataCB(CAEndpoint_t *endpoint, const void *data, si
     VERIFY_NON_NULL(endpoint, CALEADAPTER_TAG, "endpoint is NULL");
     VERIFY_NON_NULL(data, CALEADAPTER_TAG, "data is NULL");
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "IN %s", __func__);
-    OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "encrypted datalen = %d", dataLen);
+    OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "encrypted datalen = %zd", dataLen);
 
     CAResult_t result;
     CADataType_t dataType = g_dataType;
@@ -2662,7 +2550,7 @@ CAResult_t CALESecureReceiveDataCB(const CASecureEndpoint_t *sep, const void *da
     VERIFY_NON_NULL(data, CALEADAPTER_TAG, "data is NULL");
 
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG,
-              "Secure Data Receive - decrypted datalen = %d", dataLen);
+              "Secure Data Receive - decrypted datalen = %zd", dataLen);
 
     if (dataLen <= 0)
     {
@@ -2787,9 +2675,6 @@ static CAResult_t CAStartLE()
 
 static CAResult_t CAStopLE()
 {
-#ifdef __WITH_DTLS__
-    CAdeinitSslAdapter();
-#endif
 
 #ifndef SINGLE_THREAD
     CAStopLEQueues();
@@ -3211,10 +3096,8 @@ static CAResult_t CALERegisterNetworkNotifications(CAAdapterChangeCallback netCa
 {
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "IN %s", __func__);
 
-    oc_mutex_lock(g_bleNetworkCbMutex);
     g_networkCallback = netCallback;
     g_connectionCallback = connCallback;
-    oc_mutex_unlock(g_bleNetworkCbMutex);
     CAResult_t res = CA_STATUS_OK;
     if (netCallback)
     {
@@ -3301,22 +3184,64 @@ static void CALEConnectionStateChangedCb(CATransportAdapter_t adapter, const cha
         // remove data of send queue.
         if (g_bleClientSendQueueHandle)
         {
-            CALERemoveSendQueueData(g_bleClientSendQueueHandle,
-                                    g_bleClientSendDataMutex,
-                                    address);
+            CALERemoveSendQueueData(g_bleClientSendQueueHandle, address);
         }
 
         if (g_bleServerSendQueueHandle)
         {
-            CALERemoveSendQueueData(g_bleServerSendQueueHandle,
-                                    g_bleServerSendDataMutex,
-                                    address);
+            CALERemoveSendQueueData(g_bleServerSendQueueHandle, address);
         }
 #endif
 
 #ifdef __WITH_DTLS__
+#if defined(__TIZEN__) && !defined(SINGLE_THREAD)
+        // CAcloseSslConnection returns CAResult_t instead of void*, but the size is the same and crash shouldn't occur
+        pthread_t ccThread;
+        pthread_attr_t attr;
+        int initAttrRes = -1;
+        int pthreadCreateRes = -1;
+        int detachStatusRes = -1;
+        int memoryAllocationRes = -1;
+
+        do
+        {
+            initAttrRes = pthread_attr_init(&attr);
+            if (initAttrRes != 0)
+            {
+                break;
+            }
+            CAEndpoint_t *localEndpointCpyPtr = OICMalloc(sizeof(CAEndpoint_t));
+
+            if(NULL == localEndpointCpyPtr)
+            {
+                memoryAllocationRes = -1;
+                break;
+            }
+            else
+            {
+                memoryAllocationRes = 0;
+            }
+
+            (*localEndpointCpyPtr) = localEndpoint;
+            // this piece of code is reached on the main thread
+            // CAcloseSslConnection might wait for too long (network + mutexes) and watchdog might kill it
+            // Asynchronous call protects this function from watchdog
+            pthreadCreateRes = pthread_create(&ccThread, &attr, (void *(*)(void*))&CAcloseSslConnectionFreeEndpoint, (void*)localEndpointCpyPtr);
+
+            if (pthreadCreateRes != 0)
+            {
+                break;
+            }
+            detachStatusRes = pthread_detach(ccThread);
+        }while (0);
+
+        // regardless of CAcloseSslConnection result, the function will continue and g_connectionCallback will be called
+        OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "CAcloseSslConnection pthread_init [%d], mem_alloc [%d] pthread_create [%d], pthread_detach [%d]",
+                initAttrRes, memoryAllocationRes, pthreadCreateRes, detachStatusRes);
+#else
         CAcloseSslConnection(&localEndpoint);
 #endif
+#endif
     }
 
     if (g_connectionCallback)
@@ -3393,10 +3318,7 @@ static CAResult_t CALEAdapterClientSendData(const CAEndpoint_t *remoteEndpoint,
     VERIFY_NON_NULL_RET(g_bleClientSendQueueHandle, CALEADAPTER_TAG,
                         "g_bleClientSendQueueHandle is  NULL",
                         CA_STATUS_FAILED);
-    VERIFY_NON_NULL_RET(g_bleClientSendDataMutex, CALEADAPTER_TAG,
-                        "g_bleClientSendDataMutex is NULL",
-                        CA_STATUS_FAILED);
-
+    
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Data Sending to LE layer [%u]", dataLen);
 
     CALEData_t *bleData = CACreateLEData(remoteEndpoint, data, dataLen, NULL);
@@ -3406,9 +3328,13 @@ static CAResult_t CALEAdapterClientSendData(const CAEndpoint_t *remoteEndpoint,
         return CA_MEMORY_ALLOC_FAILED;
     }
     // Add message to send queue
-    oc_mutex_lock(g_bleClientSendDataMutex);
-    CAQueueingThreadAddData(g_bleClientSendQueueHandle, bleData, sizeof(CALEData_t));
-    oc_mutex_unlock(g_bleClientSendDataMutex);
+    
+    CAResult_t res = CAQueueingThreadAddData(g_bleClientSendQueueHandle, bleData,
+                                             sizeof(CALEData_t));
+    if (CA_STATUS_OK != res)
+    {
+        CALEDataDestroyer(bleData, sizeof(CALEData_t));
+    }
 #endif
     return CA_STATUS_OK;
 }
@@ -3436,13 +3362,7 @@ static CAResult_t CALEAdapterServerSendData(const CAEndpoint_t *remoteEndpoint,
     }
 #else
     VERIFY_NON_NULL_RET(g_bleServerSendQueueHandle, CALEADAPTER_TAG,
-                        "BleClientReceiverQueue is NULL",
-                        CA_STATUS_FAILED);
-    VERIFY_NON_NULL_RET(g_bleServerSendDataMutex, CALEADAPTER_TAG,
-                        "BleClientSendDataMutex is NULL",
-                        CA_STATUS_FAILED);
-
-    VERIFY_NON_NULL_RET(g_bleServerSendQueueHandle, CALEADAPTER_TAG, "sendQueueHandle",
+                        "g_bleServerSendQueueHandle is NULL",
                         CA_STATUS_FAILED);
 
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "Data Sending to LE layer [%d]", dataLen);
@@ -3457,11 +3377,14 @@ static CAResult_t CALEAdapterServerSendData(const CAEndpoint_t *remoteEndpoint,
     }
 
     // Add message to send queue
-    oc_mutex_lock(g_bleServerSendDataMutex);
-    CAQueueingThreadAddData(g_bleServerSendQueueHandle,
-                            bleData,
-                            sizeof(CALEData_t));
-    oc_mutex_unlock(g_bleServerSendDataMutex);
+    
+    CAResult_t res = CAQueueingThreadAddData(g_bleServerSendQueueHandle,
+                                           bleData,
+                                           sizeof(CALEData_t));
+    if (CA_STATUS_OK != res)
+    {
+        CALEDataDestroyer(bleData, sizeof(CALEData_t));
+    }
 #endif
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "OUT %s", __func__);
     return CA_STATUS_OK;
@@ -3570,7 +3493,7 @@ static CAResult_t CALEAdapterClientReceivedData(const char *remoteAddress,
     // Create bleData to add to queue
     OIC_LOG_V(DEBUG,
               CALEADAPTER_TAG,
-              "Data received from LE Client layer [%zu]",
+              "Data received from LE Client layer [%u]",
               dataLength);
 
     CALEData_t * const bleData =
@@ -3594,18 +3517,14 @@ static CAResult_t CALEAdapterClientReceivedData(const char *remoteAddress,
 
 static void CASetLEAdapterThreadPoolHandle(ca_thread_pool_t handle)
 {
-    oc_mutex_lock(g_bleAdapterThreadPoolMutex);
     g_bleAdapterThreadPool = handle;
-    oc_mutex_unlock(g_bleAdapterThreadPoolMutex);
+    
 }
 
 static void CASetLEReqRespAdapterCallback(CANetworkPacketReceivedCallback callback)
 {
-    oc_mutex_lock(g_bleAdapterReqRespCbMutex);
-
     g_networkPacketReceivedCallback = callback;
 
-    oc_mutex_unlock(g_bleAdapterReqRespCbMutex);
 }
 
 static void CALEErrorHandler(const char *remoteAddress,
@@ -3631,42 +3550,42 @@ static void CALEErrorHandler(const char *remoteAddress,
 }
 
 #ifndef SINGLE_THREAD
-static void CALERemoveSendQueueData(CAQueueingThread_t *queueHandle, oc_mutex mutex,
-                                    const char* address)
+static bool CALEClearQueueAddressDataContext(void *data, uint32_t size, void *ctx)
+{
+    if (NULL == data || NULL == ctx)
+    {
+        return false;
+    }
+
+    CALEData_t *caLeData = (CALEData_t *)data;
+    const char *address = (const char *)ctx;
+
+    if (NULL != caLeData && NULL != caLeData->remoteEndpoint)
+    {
+        if (!strcasecmp(caLeData->remoteEndpoint->addr, address))
+        {
+            return true;
+        }
+    }
+    return false;
+}
+
+static void CALERemoveSendQueueData(CAQueueingThread_t *queueHandle, const char* address)
 {
     OIC_LOG_V(DEBUG, CALEADAPTER_TAG, "%s", __func__);
 
     VERIFY_NON_NULL_VOID(queueHandle, CALEADAPTER_TAG, "queueHandle");
     VERIFY_NON_NULL_VOID(address, CALEADAPTER_TAG, "address");
 
-    oc_mutex_lock(mutex);
-    while (u_queue_get_size(queueHandle->dataQueue) > 0)
+    CAResult_t res = CAQueueingThreadClearContextData(queueHandle,
+                                                      CALEClearQueueAddressDataContext,
+                                                      address);
+    if (CA_STATUS_OK != res)
     {
-        OIC_LOG(DEBUG, CALEADAPTER_TAG, "get data from queue");
-        u_queue_message_t *message = u_queue_get_element(queueHandle->dataQueue);
-        if (NULL != message)
-        {
-            CALEData_t *bleData = (CALEData_t *) message->msg;
-            if (bleData && bleData->remoteEndpoint)
-            {
-                if (!strcasecmp(bleData->remoteEndpoint->addr, address))
-                {
-                    OIC_LOG(DEBUG, CALEADAPTER_TAG, "found the message of disconnected device");
-                    if (NULL != queueHandle->destroy)
-                    {
-                        queueHandle->destroy(message->msg, message->size);
-                    }
-                    else
-                    {
-                        OICFree(message->msg);
-                    }
-
-                    OICFree(message);
-                }
-            }
-        }
+        
+        OIC_LOG(ERROR, CALEADAPTER_TAG, "Could not clear the send queue");
     }
-    oc_mutex_unlock(mutex);
+    
 }
 
 static void CALERemoveReceiveQueueData(u_arraylist_t *dataInfoList, const char* address)