Avoid adding elements to QueueingThread if it's stopped. (#596) 42/217942/1
authorsenthil.gs@samsung.com <senthil.gs@samsung.com>
Wed, 6 Nov 2019 09:56:00 +0000 (15:26 +0530)
committerSudipto Bal <sudipto.bal@samsung.com>
Mon, 18 Nov 2019 02:22:27 +0000 (11:22 +0900)
* Avoid adding elements to QueueingThread if it's stopped.

Background:-
-> If a thread has called CAQueueingThreadStop(), it signals the condition variable
to wake up the CAQueueingThreadBaseRoutine.

-> After signaling, it waits on the same condition variable
to wait for the completion of CAQueueingThreadBaseRoutine.

-> CAQueueingThreadBaseRoutine will finish its current task and signals the condition variable.

-> CAQueueingThreadStop should wake up upon signal and return back to caller.

Issue:-
-> CAQueueingThreadAddData also signals the condition variable which can wake up the thread that called CAQueueingThreadStop.
-> CAQueueingThreadStop returns back to caller assuming that the CAQueueingThreadBaseRoutine has stopped. But it could still be running which might
result in race condition/unwanted behavior.

This will have a positive impact for CONPRO-1515 in which IP send thread was not stopped during OCStop() and new requests were getting added.

https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/596/commits/a9cc86c208e1cf19d2725681d9e45dcb9ec8f663
(cherry-picked from a9cc86c208e1cf19d2725681d9e45dcb9ec8f663)

Signed-off-by: Senthil Kumar G S <senthil.gs@samsung.com>
* Indentation fix.

Change-Id: Iad6b385b0bf77bc1a9fabc1525a9acbcecc5bef3
Signed-off-by: Sudipto Bal <sudipto.bal@samsung.com>
resource/csdk/connectivity/inc/caqueueingthread.h
resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c
resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c
resource/csdk/connectivity/src/camessagehandler.c
resource/csdk/connectivity/src/caqueueingthread.c
resource/csdk/connectivity/src/ip_adapter/caipadapter.c
resource/csdk/connectivity/src/nfc_adapter/canfcadapter.c
resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c

index 91deeda..cfa017c 100644 (file)
@@ -91,9 +91,11 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_
  * @param[in]   thread       thread data for new thread control.
  * @param[in]   data         data that needs to be given for each thread.
  * @param[in]   size         length of the data.
+ * @param[in]   chkThread         Option to ensure whether thread is running or not before adding data.
  * @return  CA_STATUS_OK or ERROR CODES (CAResult_t error codes in cacommon.h).
  */
-CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size);
+CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data,
+                                            uint32_t size, bool chkThread);
 
 /**
  * Clears queue thread data.
index 4481499..f5cf685 100644 (file)
@@ -660,7 +660,15 @@ CAResult_t CAAdapterRecvData(const char *remoteAddress, const uint8_t *data,
 
     // Add message to data queue
     CAEDRData *edrData =  CACreateEDRData(remoteEndpoint, data, dataLength);
-    CAQueueingThreadAddData(g_recvQueueHandle, edrData, sizeof(CAEDRData));
+    CAResult_t result = CAQueueingThreadAddData(g_recvQueueHandle, edrData, sizeof(CAEDRData), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CAFreeEDRData(edrData);
+        CAFreeEndpoint(remoteEndpoint);
+        return result;
+    }
+
     *sentLength = dataLength;
 
     // Free remote endpoint
@@ -718,7 +726,15 @@ CAResult_t CAAdapterSendData(const char *remoteAddress, const char *serviceUUID,
 
     // Add message to data queue
     CAEDRData *edrData =  CACreateEDRData(remoteEndpoint, data, dataLength);
-    CAQueueingThreadAddData(g_sendQueueHandle, edrData, sizeof (CAEDRData));
+    CAResult_t result = CAQueueingThreadAddData(g_sendQueueHandle, edrData, sizeof (CAEDRData), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CAFreeEDRData(edrData);
+        CAFreeEndpoint(remoteEndpoint);
+        return result;
+    }
+
     *sentLength = dataLength;
 
     // Free remote endpoint
index 8321ca6..0a99507 100644 (file)
@@ -3417,7 +3417,7 @@ static CAResult_t CALEAdapterClientSendData(const CAEndpoint_t *remoteEndpoint,
     // Add message to send queue
 
     CAResult_t res = CAQueueingThreadAddData(g_bleClientSendQueueHandle, bleData,
-                                             sizeof(CALEData_t));
+                                             sizeof(CALEData_t), true);
     if (CA_STATUS_OK != res)
     {
         CALEDataDestroyer(bleData, sizeof(CALEData_t));
@@ -3467,7 +3467,7 @@ static CAResult_t CALEAdapterServerSendData(const CAEndpoint_t *remoteEndpoint,
 
     CAResult_t res = CAQueueingThreadAddData(g_bleServerSendQueueHandle,
                                            bleData,
-                                           sizeof(CALEData_t));
+                                           sizeof(CALEData_t), true);
     if (CA_STATUS_OK != res)
     {
         CALEDataDestroyer(bleData, sizeof(CALEData_t));
@@ -3546,7 +3546,13 @@ static CAResult_t CALEAdapterServerReceivedData(const char *remoteAddress,
 
     CAFreeEndpoint(remoteEndpoint);
     // Add message to receiver queue
-    CAQueueingThreadAddData(g_bleServerReceiverQueue, bleData, sizeof(CALEData_t));
+    CAResult_t result = CAQueueingThreadAddData(g_bleServerReceiverQueue, bleData, sizeof(CALEData_t), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to add message to data queue!");
+        CAFreeLEData(bleData);
+        return result;
+    }
 
     *sentLength = dataLength;
 #endif
@@ -3595,7 +3601,13 @@ static CAResult_t CALEAdapterClientReceivedData(const char *remoteAddress,
 
     CAFreeEndpoint(remoteEndpoint);
     // Add message to receiver queue
-    CAQueueingThreadAddData(g_bleClientReceiverQueue, bleData, sizeof(CALEData_t));
+    CAResult_t result = CAQueueingThreadAddData(g_bleClientReceiverQueue, bleData, sizeof(CALEData_t), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to add message to data queue!");
+        CAFreeLEData(bleData);
+        return result;
+    }
 
     *sentLength = dataLength;
 #endif
index c91f820..ef3e3e7 100644 (file)
@@ -116,7 +116,12 @@ void CAAddDataToSendThread(CAData_t *data)
     VERIFY_NON_NULL_VOID(data, TAG, "data");
 
     // add thread
-    CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+    CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(data, sizeof(CAData_t));
+    }
 }
 
 void CAAddDataToReceiveThread(CAData_t *data)
@@ -124,7 +129,13 @@ void CAAddDataToReceiveThread(CAData_t *data)
     VERIFY_NON_NULL_VOID(data, TAG, "data");
 
     // add thread
-    CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t));
+    CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t), false);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(data, sizeof(CAData_t));
+        return;
+    }
 
 #ifdef WITH_PROCESS_EVENT
     if (g_processEvent)
@@ -346,7 +357,13 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin
 #ifdef SINGLE_THREAD
     CAProcessReceivedData(cadata);
 #else
-    CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+    CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(cadata, sizeof(CAData_t));
+        return;
+    }
 
 #ifdef WITH_PROCESS_EVENT
     if (g_processEvent)
@@ -894,7 +911,14 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep,
     else
 #endif
     {
-        CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+        CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+        if (result != CA_STATUS_OK)
+        {
+            OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+            CADestroyData(cadata, sizeof(CAData_t));
+            coap_delete_pdu(pdu);
+            goto exit;
+        }
 
 #ifdef WITH_PROCESS_EVENT
         if (g_processEvent)
@@ -1127,7 +1151,13 @@ CAResult_t CADetachSendNetworkReqMessage(const CAEndpoint_t *endpoint,
     cadata->eventInfo = event;
     cadata->dataType = dataType;
 
-    CAQueueingThreadAddData(&g_sendThread, cadata, sizeof(CAData_t));
+    CAResult_t result = CAQueueingThreadAddData(&g_sendThread, cadata, sizeof(CAData_t), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(cadata, sizeof(CAData_t));
+        return result;
+    }
 #endif
 
     return CA_STATUS_OK;
@@ -1183,7 +1213,13 @@ CAResult_t CADetachSendMessage(const CAEndpoint_t *endpoint, const void *sendMsg
         if (CA_NOT_SUPPORTED == res)
         {
             OIC_LOG(DEBUG, TAG, "normal msg will be sent");
-            CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+            CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+            if (result != CA_STATUS_OK)
+            {
+                OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+                CADestroyData(data, sizeof(CAData_t));
+                return result;
+            }
             return CA_STATUS_OK;
         }
         else
@@ -1196,7 +1232,13 @@ CAResult_t CADetachSendMessage(const CAEndpoint_t *endpoint, const void *sendMsg
     else
 #endif // WITH_BWT
     {
-        CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t));
+        CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true);
+        if (result != CA_STATUS_OK)
+        {
+            OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+            CADestroyData(data, sizeof(CAData_t));
+            return result;
+        }
     }
 #endif // SINGLE_THREAD
 
@@ -1501,7 +1543,14 @@ void CAErrorHandler(const CAEndpoint_t *endpoint,
 
     cadata->errorInfo->result = result;
 
-    CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+    CAResult_t res = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+    if (res != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(cadata, sizeof(CAData_t));
+        coap_delete_pdu(pdu);
+        return;
+    }
 
 #ifdef WITH_PROCESS_EVENT
     if (g_processEvent)
@@ -1561,7 +1610,13 @@ static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info,
     cadata->errorInfo = errorInfo;
     cadata->dataType = CA_ERROR_DATA;
 
-    CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t));
+    res = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false);
+    if (res != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CADestroyData(cadata, sizeof(CAData_t));
+        return;
+    }
 
 #ifdef WITH_PROCESS_EVENT
     if (g_processEvent)
index 05baf0f..018e53b 100755 (executable)
@@ -214,7 +214,8 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_
     return res;
 }
 
-CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size)
+CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data,
+                                   uint32_t size, bool chkThread)
 {
     if (NULL == thread)
     {
@@ -243,6 +244,17 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint3
     // mutex lock
     oc_mutex_lock(thread->threadMutex);
 
+    // thread stop
+    if (chkThread && thread->isStop)
+    {
+        // mutex unlock
+        oc_mutex_unlock(thread->threadMutex);
+
+        OIC_LOG(ERROR, TAG, "thread is stopped..");
+        OICFree(message);
+        return CA_STATUS_FAILED;
+    }
+
     // add thread data into list
     u_queue_add_element(thread->dataQueue, message);
 
index 313584c..5b50636 100644 (file)
@@ -405,7 +405,13 @@ static int32_t CAQueueIPData(bool isMulticast, const CAEndpoint_t *endpoint,
         return -1;
     }
     // Add message to send queue
-    CAQueueingThreadAddData(g_sendQueueHandle, ipData, sizeof(CAIPData_t));
+    CAResult_t result = CAQueueingThreadAddData(g_sendQueueHandle, ipData, sizeof(CAIPData_t), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CAFreeIPData(ipData);
+        return -1;
+    }
 
 #endif // SINGLE_THREAD
 
index 7f77e69..32f2e70 100644 (file)
@@ -296,7 +296,13 @@ static int32_t CAQueueNFCData(const CAEndpoint_t *endpoint, const void *data,
         return -1;
     }
     // Add message to send queue
-    CAQueueingThreadAddData(g_sendQueueHandle, nfcData, sizeof(CANFCData));
+    CAResult_t result = CAQueueingThreadAddData(g_sendQueueHandle, nfcData, sizeof(CANFCData), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CAFreeNFCData(nfcData);
+        return -1;
+    }
 
     OIC_LOG(DEBUG, TAG, "OUT");
     return dataLength;
index 83493d3..19bea61 100755 (executable)
@@ -629,7 +629,13 @@ static size_t CAQueueTCPData(bool isMulticast, const CAEndpoint_t *endpoint,
         return -1;
     }
     // Add message to send queue
-    CAQueueingThreadAddData(g_sendQueueHandle, tcpData, sizeof(CATCPData));
+    CAResult_t result = CAQueueingThreadAddData(g_sendQueueHandle, tcpData, sizeof(CATCPData), true);
+    if (result != CA_STATUS_OK)
+    {
+        OIC_LOG(ERROR, TAG, "Failed to add message to data queue!");
+        CAFreeTCPData(tcpData);
+        return -1;
+    }
 
     return dataLength;
 }