Set the queue handle to null before stop & destroy. (#525)
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / tcp_adapter / catcpadapter.c
index 67905f6..ea55c74 100644 (file)
@@ -64,6 +64,16 @@ typedef struct
 #define CA_TCP_SELECT_TIMEOUT 10
 
 /**
+ * Mutex to synchronize TCP adapter access.
+ */
+static oc_mutex g_mutexAdapter = NULL;
+
+/**
+ * State to control closing of TCP servers on interface down event.
+ */
+static bool g_skipCloseOnIFDown;
+
+/**
  * Queue handle for Send Data.
  */
 static CAQueueingThread_t *g_sendQueueHandle = NULL;
@@ -101,7 +111,7 @@ static void CATCPErrorHandler(const CAEndpoint_t *endpoint, const void *data,
 
 static CAResult_t CATCPInitializeQueueHandles();
 
-static void CATCPDeinitializeQueueHandles();
+static void CATCPDeinitializeQueueHandles(CAQueueingThread_t *queueHandle);
 
 static void CATCPSendDataThread(void *threadData);
 
@@ -142,11 +152,10 @@ CAResult_t CATCPInitializeQueueHandles()
     return CA_STATUS_OK;
 }
 
-void CATCPDeinitializeQueueHandles()
+void CATCPDeinitializeQueueHandles(CAQueueingThread_t *queueHandle)
 {
-    CAQueueingThreadDestroy(g_sendQueueHandle);
-    OICFree(g_sendQueueHandle);
-    g_sendQueueHandle = NULL;
+    CAQueueingThreadDestroy(queueHandle);
+    OICFree(queueHandle);
 }
 
 void CATCPConnectionStateCB(const char *ipAddress, CANetworkStatus_t status)
@@ -217,7 +226,7 @@ CAResult_t CATCPPacketReceivedCB(const CASecureEndpoint_t *sep, const void *data
         }
         else
         {
-            OIC_LOG_V(DEBUG, TAG, "%u bytes required for complete CoAP",
+            OIC_LOG_V(DEBUG, TAG, "%zd bytes required for complete CoAP",
                                 svritem->totalLen - svritem->len);
         }
     }
@@ -236,7 +245,7 @@ static ssize_t CATCPPacketSendCB(CAEndpoint_t *endpoint, const void *data, size_
     OIC_LOG_BUFFER(DEBUG, TAG, data, dataLength);
 
     ssize_t ret = CATCPSendData(endpoint, data, dataLength);
-    OIC_LOG_V(DEBUG, TAG, "Out %s : %d bytes sent", __func__, ret);
+    OIC_LOG_V(DEBUG, TAG, "Out %s : %zd bytes sent", __func__, ret);
     return ret;
 }
 #endif
@@ -290,7 +299,16 @@ void CATCPAdapterHandler(CATransportAdapter_t adapter, CANetworkStatus_t status)
             OIC_LOG_V(ERROR, TAG, "CAQueueingThreadClearData failed[%d]", res);
         }
 
-        CATCPStopServer();
+        oc_mutex_lock(g_mutexAdapter);
+        if (!g_skipCloseOnIFDown)
+        {
+            CATCPStopServer();
+        }
+        else
+        {
+            OIC_LOG(INFO, TAG, "Skip closing servers!");
+        }
+        oc_mutex_unlock(g_mutexAdapter);
     }
     else if (CA_INTERFACE_UP == status)
     {
@@ -331,10 +349,13 @@ static void CAInitializeTCPGlobals()
     {
         flags |= caglobals.clientFlags;
     }
+
+#ifndef DISABLE_TCP_SERVER
     if (caglobals.server)
     {
         flags |= caglobals.serverFlags;
     }
+#endif
 
     caglobals.tcp.ipv4tcpenabled = flags & CA_IPV4;
     caglobals.tcp.ipv6tcpenabled = flags & CA_IPV6;
@@ -361,6 +382,19 @@ CAResult_t CAInitializeTCP(CARegisterConnectivityCallback registerCallback,
 
     CAInitializeTCPGlobals();
 
+    // Create Mutex for synchronize access at adapter level
+    if (!g_mutexAdapter)
+    {
+        g_mutexAdapter = oc_mutex_new();
+        if (!g_mutexAdapter)
+        {
+            OIC_LOG(ERROR, TAG, "Failed to create mutex!");
+            return CA_STATUS_FAILED;
+        }
+    }
+
+    g_skipCloseOnIFDown = false;
+
     CAResult_t res = CATCPCreateMutex();
     if (CA_STATUS_OK == res)
     {
@@ -374,6 +408,21 @@ CAResult_t CAInitializeTCP(CARegisterConnectivityCallback registerCallback,
         return res;
     }
 
+    res = CATCPCreateSendMutex();
+    if (CA_STATUS_OK == res)
+    {
+        res = CATCPCreateSendCond();
+    }
+    if (CA_STATUS_OK != res)
+    {
+        OIC_LOG(ERROR, TAG, "failed to create send data mutex/cond");
+        CATCPDestroyMutex();
+        CATCPDestroyCond();
+        CATCPDestroySendMutex();
+        CATCPDestroySendCond();
+        return res;
+    }
+
 #ifndef SINGLE_THREAD
     caglobals.tcp.threadpool = handle;
 #endif
@@ -444,6 +493,8 @@ CAResult_t CAStartTCP()
 
 static bool CAClearQueueEndpointDataContext(void *data, uint32_t size, void *ctx)
 {
+    (void)size;
+
     if (NULL == data || NULL == ctx)
     {
         return false;
@@ -465,29 +516,41 @@ static bool CAClearQueueEndpointDataContext(void *data, uint32_t size, void *ctx
 
 CAResult_t CATCPDisconnectSession(const CAEndpoint_t *endpoint)
 {
-    CAResult_t res = CA_STATUS_OK;
+    CAResult_t res = CAQueueingThreadClearContextData(g_sendQueueHandle,
+                                                      CAClearQueueEndpointDataContext,
+                                                      endpoint);
+    if (CA_STATUS_OK != res)
+    {
+        OIC_LOG(ERROR, TAG, "failed to clear context data");
+    }
+
 #ifdef __WITH_TLS__
     res = CAcloseSslConnection(endpoint);
     if (CA_STATUS_OK != res)
     {
         OIC_LOG(ERROR, TAG, "failed to close TLS session");
-        return res;
     }
 #endif
+
     res = CASearchAndDeleteTCPSession(endpoint);
     if (CA_STATUS_OK != res)
     {
         OIC_LOG(ERROR, TAG, "failed to close TCP session");
     }
-    res = CAQueueingThreadClearContextData(g_sendQueueHandle,
-                                           CAClearQueueEndpointDataContext,
-                                           endpoint);
+
     return res;
 }
 
+void CATCPSkipCloseOnInterfaceDown(bool state)
+{
+    oc_mutex_lock(g_mutexAdapter);
+    g_skipCloseOnIFDown = state;
+    oc_mutex_unlock(g_mutexAdapter);
+}
+
 CAResult_t CAStartTCPListeningServer()
 {
-#ifndef SINGLE_THREAD
+#if !defined(SINGLE_THREAD) && !defined(DISABLE_TCP_SERVER)
     if (!caglobals.server)
     {
         caglobals.server = true;    // only needed to run CA tests
@@ -570,8 +633,13 @@ int32_t CASendTCPMulticastData(const CAEndpoint_t *endpoint,
                                const void *data, uint32_t dataLength,
                                CADataType_t dataType)
 {
+    (void)endpoint;
+    (void)data;
+    (void)dataLength;
     (void)dataType;
-    return CAQueueTCPData(true, endpoint, data, dataLength);
+
+    OIC_LOG(ERROR, TAG, "TCP adapter does not support multicast sending!");
+    return 0;
 }
 
 CAResult_t CAReadTCPData()
@@ -587,17 +655,33 @@ CAResult_t CAStopTCP()
 {
     CAIPStopNetworkMonitor(CA_ADAPTER_TCP);
 
+    /* Some times send queue thread fails to terminate as it's worker
+       thread gets blocked at TCP session's socket connect operation.
+       So closing sockets which are in connect operation at the time
+       of termination of adapter would save send queue thread from
+       getting blocked. */
+    CATCPCloseInProgressConnections();
+
 #ifndef SINGLE_THREAD
-    if (g_sendQueueHandle && g_sendQueueHandle->threadMutex)
+    // Stop send queue thread.
+    if (g_sendQueueHandle != NULL)
     {
-        CAQueueingThreadStop(g_sendQueueHandle);
+        // g_sendQueueHandle is set to NULL to prevent new requests from being enqueued.
+        CAQueueingThread_t *queueHandle = g_sendQueueHandle;
+        g_sendQueueHandle = NULL;
+
+        if (queueHandle->threadMutex)
+        {
+            CAQueueingThreadStop(queueHandle);
+        }
+        CATCPDeinitializeQueueHandles(queueHandle);
     }
-    CATCPDeinitializeQueueHandles();
 #endif
 
+    // Close TCP servers and established connections.
     CATCPStopServer();
 
-    //Re-initializing the Globals to start them again
+    // Re-initializing the Globals to start them again.
     CAInitializeTCPGlobals();
 
     return CA_STATUS_OK;
@@ -609,6 +693,18 @@ void CATerminateTCP()
 
     CATCPDestroyMutex();
     CATCPDestroyCond();
+
+    CATCPDestroySendMutex();
+    CATCPDestroySendCond();
+
+    g_skipCloseOnIFDown = false;
+
+    // Free adapter mutex
+    if (g_mutexAdapter)
+    {
+        oc_mutex_free(g_mutexAdapter);
+        g_mutexAdapter = NULL;
+    }
 }
 
 void CATCPSendDataThread(void *threadData)