From bf7c1453bb2758293c8f544fee1c9897438be5d3 Mon Sep 17 00:00:00 2001 From: "senthil.gs@samsung.com" Date: Wed, 6 Nov 2019 15:26:00 +0530 Subject: [PATCH] Avoid adding elements to QueueingThread if it's stopped. (#596) * Avoid adding elements to QueueingThread if it's stopped. Background:- -> If a thread has called CAQueueingThreadStop(), it signals the condition variable to wake up the CAQueueingThreadBaseRoutine. -> After signaling, it waits on the same condition variable to wait for the completion of CAQueueingThreadBaseRoutine. -> CAQueueingThreadBaseRoutine will finish its current task and signals the condition variable. -> CAQueueingThreadStop should wake up upon signal and return back to caller. Issue:- -> CAQueueingThreadAddData also signals the condition variable which can wake up the thread that called CAQueueingThreadStop. -> CAQueueingThreadStop returns back to caller assuming that the CAQueueingThreadBaseRoutine has stopped. But it could still be running which might result in race condition/unwanted behavior. This will have a positive impact for CONPRO-1515 in which IP send thread was not stopped during OCStop() and new requests were getting added. https://github.sec.samsung.net/RS7-IOTIVITY/IoTivity/pull/596/commits/a9cc86c208e1cf19d2725681d9e45dcb9ec8f663 (cherry-picked from a9cc86c208e1cf19d2725681d9e45dcb9ec8f663) Signed-off-by: Senthil Kumar G S * Indentation fix. Change-Id: Iad6b385b0bf77bc1a9fabc1525a9acbcecc5bef3 Signed-off-by: Sudipto Bal --- resource/csdk/connectivity/inc/caqueueingthread.h | 4 +- .../connectivity/src/bt_edr_adapter/caedradapter.c | 20 +++++- .../connectivity/src/bt_le_adapter/caleadapter.c | 20 ++++-- resource/csdk/connectivity/src/camessagehandler.c | 73 +++++++++++++++++++--- resource/csdk/connectivity/src/caqueueingthread.c | 14 ++++- .../csdk/connectivity/src/ip_adapter/caipadapter.c | 8 ++- .../connectivity/src/nfc_adapter/canfcadapter.c | 8 ++- .../connectivity/src/tcp_adapter/catcpadapter.c | 8 ++- 8 files changed, 135 insertions(+), 20 deletions(-) diff --git a/resource/csdk/connectivity/inc/caqueueingthread.h b/resource/csdk/connectivity/inc/caqueueingthread.h index 91deeda..cfa017c 100644 --- a/resource/csdk/connectivity/inc/caqueueingthread.h +++ b/resource/csdk/connectivity/inc/caqueueingthread.h @@ -91,9 +91,11 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_ * @param[in] thread thread data for new thread control. * @param[in] data data that needs to be given for each thread. * @param[in] size length of the data. + * @param[in] chkThread Option to ensure whether thread is running or not before adding data. * @return CA_STATUS_OK or ERROR CODES (CAResult_t error codes in cacommon.h). */ -CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size); +CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, + uint32_t size, bool chkThread); /** * Clears queue thread data. diff --git a/resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c b/resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c index 4481499..f5cf685 100644 --- a/resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c +++ b/resource/csdk/connectivity/src/bt_edr_adapter/caedradapter.c @@ -660,7 +660,15 @@ CAResult_t CAAdapterRecvData(const char *remoteAddress, const uint8_t *data, // Add message to data queue CAEDRData *edrData = CACreateEDRData(remoteEndpoint, data, dataLength); - CAQueueingThreadAddData(g_recvQueueHandle, edrData, sizeof(CAEDRData)); + CAResult_t result = CAQueueingThreadAddData(g_recvQueueHandle, edrData, sizeof(CAEDRData), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CAFreeEDRData(edrData); + CAFreeEndpoint(remoteEndpoint); + return result; + } + *sentLength = dataLength; // Free remote endpoint @@ -718,7 +726,15 @@ CAResult_t CAAdapterSendData(const char *remoteAddress, const char *serviceUUID, // Add message to data queue CAEDRData *edrData = CACreateEDRData(remoteEndpoint, data, dataLength); - CAQueueingThreadAddData(g_sendQueueHandle, edrData, sizeof (CAEDRData)); + CAResult_t result = CAQueueingThreadAddData(g_sendQueueHandle, edrData, sizeof (CAEDRData), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CAFreeEDRData(edrData); + CAFreeEndpoint(remoteEndpoint); + return result; + } + *sentLength = dataLength; // Free remote endpoint diff --git a/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c b/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c index 8321ca6..0a99507 100644 --- a/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c +++ b/resource/csdk/connectivity/src/bt_le_adapter/caleadapter.c @@ -3417,7 +3417,7 @@ static CAResult_t CALEAdapterClientSendData(const CAEndpoint_t *remoteEndpoint, // Add message to send queue CAResult_t res = CAQueueingThreadAddData(g_bleClientSendQueueHandle, bleData, - sizeof(CALEData_t)); + sizeof(CALEData_t), true); if (CA_STATUS_OK != res) { CALEDataDestroyer(bleData, sizeof(CALEData_t)); @@ -3467,7 +3467,7 @@ static CAResult_t CALEAdapterServerSendData(const CAEndpoint_t *remoteEndpoint, CAResult_t res = CAQueueingThreadAddData(g_bleServerSendQueueHandle, bleData, - sizeof(CALEData_t)); + sizeof(CALEData_t), true); if (CA_STATUS_OK != res) { CALEDataDestroyer(bleData, sizeof(CALEData_t)); @@ -3546,7 +3546,13 @@ static CAResult_t CALEAdapterServerReceivedData(const char *remoteAddress, CAFreeEndpoint(remoteEndpoint); // Add message to receiver queue - CAQueueingThreadAddData(g_bleServerReceiverQueue, bleData, sizeof(CALEData_t)); + CAResult_t result = CAQueueingThreadAddData(g_bleServerReceiverQueue, bleData, sizeof(CALEData_t), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to add message to data queue!"); + CAFreeLEData(bleData); + return result; + } *sentLength = dataLength; #endif @@ -3595,7 +3601,13 @@ static CAResult_t CALEAdapterClientReceivedData(const char *remoteAddress, CAFreeEndpoint(remoteEndpoint); // Add message to receiver queue - CAQueueingThreadAddData(g_bleClientReceiverQueue, bleData, sizeof(CALEData_t)); + CAResult_t result = CAQueueingThreadAddData(g_bleClientReceiverQueue, bleData, sizeof(CALEData_t), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, CALEADAPTER_TAG, "Failed to add message to data queue!"); + CAFreeLEData(bleData); + return result; + } *sentLength = dataLength; #endif diff --git a/resource/csdk/connectivity/src/camessagehandler.c b/resource/csdk/connectivity/src/camessagehandler.c index c91f820..ef3e3e7 100644 --- a/resource/csdk/connectivity/src/camessagehandler.c +++ b/resource/csdk/connectivity/src/camessagehandler.c @@ -116,7 +116,12 @@ void CAAddDataToSendThread(CAData_t *data) VERIFY_NON_NULL_VOID(data, TAG, "data"); // add thread - CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); + CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(data, sizeof(CAData_t)); + } } void CAAddDataToReceiveThread(CAData_t *data) @@ -124,7 +129,13 @@ void CAAddDataToReceiveThread(CAData_t *data) VERIFY_NON_NULL_VOID(data, TAG, "data"); // add thread - CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t)); + CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, data, sizeof(CAData_t), false); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(data, sizeof(CAData_t)); + return; + } #ifdef WITH_PROCESS_EVENT if (g_processEvent) @@ -346,7 +357,13 @@ static void CATimeoutCallback(const CAEndpoint_t *endpoint, const void *pdu, uin #ifdef SINGLE_THREAD CAProcessReceivedData(cadata); #else - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(cadata, sizeof(CAData_t)); + return; + } #ifdef WITH_PROCESS_EVENT if (g_processEvent) @@ -894,7 +911,14 @@ static CAResult_t CAReceivedPacketCallback(const CASecureEndpoint_t *sep, else #endif { - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + CAResult_t result = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(cadata, sizeof(CAData_t)); + coap_delete_pdu(pdu); + goto exit; + } #ifdef WITH_PROCESS_EVENT if (g_processEvent) @@ -1127,7 +1151,13 @@ CAResult_t CADetachSendNetworkReqMessage(const CAEndpoint_t *endpoint, cadata->eventInfo = event; cadata->dataType = dataType; - CAQueueingThreadAddData(&g_sendThread, cadata, sizeof(CAData_t)); + CAResult_t result = CAQueueingThreadAddData(&g_sendThread, cadata, sizeof(CAData_t), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(cadata, sizeof(CAData_t)); + return result; + } #endif return CA_STATUS_OK; @@ -1183,7 +1213,13 @@ CAResult_t CADetachSendMessage(const CAEndpoint_t *endpoint, const void *sendMsg if (CA_NOT_SUPPORTED == res) { OIC_LOG(DEBUG, TAG, "normal msg will be sent"); - CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); + CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(data, sizeof(CAData_t)); + return result; + } return CA_STATUS_OK; } else @@ -1196,7 +1232,13 @@ CAResult_t CADetachSendMessage(const CAEndpoint_t *endpoint, const void *sendMsg else #endif // WITH_BWT { - CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t)); + CAResult_t result = CAQueueingThreadAddData(&g_sendThread, data, sizeof(CAData_t), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(data, sizeof(CAData_t)); + return result; + } } #endif // SINGLE_THREAD @@ -1501,7 +1543,14 @@ void CAErrorHandler(const CAEndpoint_t *endpoint, cadata->errorInfo->result = result; - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + CAResult_t res = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false); + if (res != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(cadata, sizeof(CAData_t)); + coap_delete_pdu(pdu); + return; + } #ifdef WITH_PROCESS_EVENT if (g_processEvent) @@ -1561,7 +1610,13 @@ static void CASendErrorInfo(const CAEndpoint_t *endpoint, const CAInfo_t *info, cadata->errorInfo = errorInfo; cadata->dataType = CA_ERROR_DATA; - CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t)); + res = CAQueueingThreadAddData(&g_receiveThread, cadata, sizeof(CAData_t), false); + if (res != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CADestroyData(cadata, sizeof(CAData_t)); + return; + } #ifdef WITH_PROCESS_EVENT if (g_processEvent) diff --git a/resource/csdk/connectivity/src/caqueueingthread.c b/resource/csdk/connectivity/src/caqueueingthread.c index 05baf0f..018e53b 100755 --- a/resource/csdk/connectivity/src/caqueueingthread.c +++ b/resource/csdk/connectivity/src/caqueueingthread.c @@ -214,7 +214,8 @@ CAResult_t CAQueueingThreadStart(CAQueueingThread_t *thread, const char *thread_ return res; } -CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint32_t size) +CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, + uint32_t size, bool chkThread) { if (NULL == thread) { @@ -243,6 +244,17 @@ CAResult_t CAQueueingThreadAddData(CAQueueingThread_t *thread, void *data, uint3 // mutex lock oc_mutex_lock(thread->threadMutex); + // thread stop + if (chkThread && thread->isStop) + { + // mutex unlock + oc_mutex_unlock(thread->threadMutex); + + OIC_LOG(ERROR, TAG, "thread is stopped.."); + OICFree(message); + return CA_STATUS_FAILED; + } + // add thread data into list u_queue_add_element(thread->dataQueue, message); diff --git a/resource/csdk/connectivity/src/ip_adapter/caipadapter.c b/resource/csdk/connectivity/src/ip_adapter/caipadapter.c index 313584c..5b50636 100644 --- a/resource/csdk/connectivity/src/ip_adapter/caipadapter.c +++ b/resource/csdk/connectivity/src/ip_adapter/caipadapter.c @@ -405,7 +405,13 @@ static int32_t CAQueueIPData(bool isMulticast, const CAEndpoint_t *endpoint, return -1; } // Add message to send queue - CAQueueingThreadAddData(g_sendQueueHandle, ipData, sizeof(CAIPData_t)); + CAResult_t result = CAQueueingThreadAddData(g_sendQueueHandle, ipData, sizeof(CAIPData_t), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CAFreeIPData(ipData); + return -1; + } #endif // SINGLE_THREAD diff --git a/resource/csdk/connectivity/src/nfc_adapter/canfcadapter.c b/resource/csdk/connectivity/src/nfc_adapter/canfcadapter.c index 7f77e69..32f2e70 100644 --- a/resource/csdk/connectivity/src/nfc_adapter/canfcadapter.c +++ b/resource/csdk/connectivity/src/nfc_adapter/canfcadapter.c @@ -296,7 +296,13 @@ static int32_t CAQueueNFCData(const CAEndpoint_t *endpoint, const void *data, return -1; } // Add message to send queue - CAQueueingThreadAddData(g_sendQueueHandle, nfcData, sizeof(CANFCData)); + CAResult_t result = CAQueueingThreadAddData(g_sendQueueHandle, nfcData, sizeof(CANFCData), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CAFreeNFCData(nfcData); + return -1; + } OIC_LOG(DEBUG, TAG, "OUT"); return dataLength; diff --git a/resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c b/resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c index 83493d3..19bea61 100755 --- a/resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c +++ b/resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c @@ -629,7 +629,13 @@ static size_t CAQueueTCPData(bool isMulticast, const CAEndpoint_t *endpoint, return -1; } // Add message to send queue - CAQueueingThreadAddData(g_sendQueueHandle, tcpData, sizeof(CATCPData)); + CAResult_t result = CAQueueingThreadAddData(g_sendQueueHandle, tcpData, sizeof(CATCPData), true); + if (result != CA_STATUS_OK) + { + OIC_LOG(ERROR, TAG, "Failed to add message to data queue!"); + CAFreeTCPData(tcpData); + return -1; + } return dataLength; } -- 2.7.4