Update snapshot(2018-01-31)
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / tcp_adapter / catcpserver.c
index 16ed703..661612e 100644 (file)
  */
 #define MAX_CONNECTION_COUNTS   500
 
+#define COAP_TCP_MAX_BUFFER_CHUNK_SIZE 65530 //64kb - 6 (coap+tcp max header size)
+
 /**
  * Thread pool.
  */
 static ca_thread_pool_t g_threadPool = NULL;
 
 /**
- * An unique identifier of receive thread.
- */
-static uint32_t g_recvThreadId = 0;
-
-/**
  * Mutex to synchronize device object list.
  */
 static oc_mutex g_mutexObjectList = NULL;
@@ -118,6 +115,7 @@ static void CAReceiveMessage(int fd);
 static void CAReceiveHandler(void *data);
 static CAResult_t CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem);
 static void CATCPInitializeSocket();
+static CATCPSessionInfo_t *CAGetSessionInfoFromFDAsOwner(int fd, size_t *index);
 
 #define CHECKFD(FD) \
     if (FD > caglobals.tcp.maxfd) \
@@ -291,7 +289,20 @@ static void CASelectReturned(fd_set *readFds)
     }
     else
     {
+        int *readFDList = NULL;
+        size_t readFDListSize = 0;
+
+        oc_mutex_lock(g_mutexObjectList);
         uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+
+        readFDList = (int*) OICCalloc(length, sizeof(int));
+        if (NULL == readFDList)
+        {
+            OIC_LOG_V(ERROR, TAG, "Failed to allocate memory!");
+            oc_mutex_unlock(g_mutexObjectList);
+            return;
+        }
+
         for (size_t i = 0; i < length; i++)
         {
             CATCPSessionInfo_t *svritem =
@@ -300,10 +311,19 @@ static void CASelectReturned(fd_set *readFds)
             {
                 if (FD_ISSET(svritem->fd, readFds))
                 {
-                    CAReceiveMessage(svritem->fd);
+                    readFDList[readFDListSize++] = svritem->fd;
                 }
             }
         }
+        oc_mutex_unlock(g_mutexObjectList);
+
+        // Read incomming messages from fds
+        for (size_t i = 0; i < readFDListSize; i++)
+        {
+            CAReceiveMessage(readFDList[i]);
+        }
+
+        OICFree(readFDList);
     }
 }
 
@@ -347,12 +367,23 @@ static void CAAcceptConnection(CATransportFlags_t flag, CASocket_t *sock)
         CAConvertAddrToName((struct sockaddr_storage *)&clientaddr, clientlen,
                             svritem->sep.endpoint.addr, &svritem->sep.endpoint.port);
 
+        // Allocate message buffer
+        svritem->tlsdata = (unsigned char*) OICCalloc(TLS_DATA_MAX_SIZE, sizeof(unsigned char));
+        if (!svritem->tlsdata)
+        {
+            OIC_LOG(ERROR, TAG, "Out of memory");
+            close(sockfd);
+            OICFree(svritem);
+            return;
+        }
+
         oc_mutex_lock(g_mutexObjectList);
         bool result = u_arraylist_add(caglobals.tcp.svrlist, svritem);
         if (!result)
         {
             OIC_LOG(ERROR, TAG, "u_arraylist_add failed.");
             close(sockfd);
+            OICFree(svritem->tlsdata);
             OICFree(svritem);
             oc_mutex_unlock(g_mutexObjectList);
             return;
@@ -386,6 +417,7 @@ void CACleanData(CATCPSessionInfo_t *svritem)
         svritem->tlsLen = 0;
 #endif
         svritem->totalLen = 0;
+        svritem->bufLen = 0;
         svritem->protocol = UNKNOWN;
     }
 }
@@ -426,6 +458,7 @@ CAResult_t CAConstructCoAP(CATCPSessionInfo_t *svritem, unsigned char **data,
         // copy 1 byte to parse coap header length
         memcpy(svritem->data, inBuffer, 1);
         svritem->len = 1;
+        svritem->bufLen = COAP_MAX_HEADER_SIZE;
         inBuffer++;
         inLen--;
     }
@@ -470,28 +503,41 @@ CAResult_t CAConstructCoAP(CATCPSessionInfo_t *svritem, unsigned char **data,
 
         //calculate CoAP message length
         svritem->totalLen = CAGetTotalLengthFromHeader(svritem->data);
-
-        // allocate required memory
-        unsigned char *buffer = OICRealloc(svritem->data, svritem->totalLen);
-        if (NULL == buffer)
-        {
-            OIC_LOG(ERROR, TAG, "OICRealloc - out of memory");
-            return CA_MEMORY_ALLOC_FAILED;
-        }
-        svritem->data = buffer;
     }
 
     // PAYLOAD
     if (inLen > 0)
     {
-        // read required bytes to have full CoAP payload
+        // Calculate length of data to be copied.
         copyLen = svritem->totalLen - svritem->len;
         if (inLen < copyLen)
         {
             copyLen = inLen;
         }
 
-        //read required bytes to have full CoAP header
+        // Is buffer not big enough for remaining data ?
+        if (svritem->len + copyLen > svritem->bufLen)
+        {
+            // Resize buffer to accommodate enough space
+            size_t extLen = svritem->totalLen - svritem->bufLen;
+            if (extLen > COAP_TCP_MAX_BUFFER_CHUNK_SIZE)
+            {
+                extLen = COAP_TCP_MAX_BUFFER_CHUNK_SIZE;
+            }
+
+            // Allocate required memory
+            unsigned char *buffer = OICRealloc(svritem->data, svritem->bufLen + extLen);
+            if (NULL == buffer)
+            {
+                OIC_LOG(ERROR, TAG, "OICRealloc - out of memory");
+                return CA_MEMORY_ALLOC_FAILED;
+            }
+
+            svritem->data = buffer;
+            svritem->bufLen += extLen;
+        }
+
+        // Read required bytes to have full CoAP payload
         memcpy(svritem->data + svritem->len, inBuffer, copyLen);
         svritem->len += copyLen;
         inBuffer += copyLen;
@@ -510,19 +556,21 @@ static void CAReceiveMessage(int fd)
 {
     CAResult_t res = CA_STATUS_OK;
 
+    oc_mutex_lock(g_mutexObjectList);
+
     //get remote device information from file descriptor.
     size_t index = 0;
-    CATCPSessionInfo_t *svritem = CAGetSessionInfoFromFD(fd, &index);
+    CATCPSessionInfo_t *svritem = CAGetSessionInfoFromFDAsOwner(fd, &index);
     if (!svritem)
     {
         OIC_LOG(ERROR, TAG, "there is no connection information in list");
+        oc_mutex_unlock(g_mutexObjectList);
         return;
     }
 
-    // read data
+    CASecureEndpoint_t peerEP = svritem->sep;
     int len = 0;
-
-    if (svritem->sep.endpoint.flags & CA_SECURE)
+    if (svritem->sep.endpoint.flags & CA_SECURE) // Secure connection
     {
         svritem->protocol = TLS;
 
@@ -540,15 +588,12 @@ static void CAReceiveMessage(int fd)
             tlsLength = TLS_HEADER_SIZE +
                             (size_t)((svritem->tlsdata[3] << 8) | svritem->tlsdata[4]);
             OIC_LOG_V(DEBUG, TAG, "total tls length = %u", tlsLength);
-            if (tlsLength > sizeof(svritem->tlsdata))
+            if (tlsLength > TLS_DATA_MAX_SIZE)
             {
                 OIC_LOG_V(ERROR, TAG, "total tls length is too big (buffer size : %u)",
-                                    sizeof(svritem->tlsdata));
-                if (CA_STATUS_OK != CAcloseSslConnection(&svritem->sep.endpoint))
-                {
-                    OIC_LOG(ERROR, TAG, "Failed to close TLS session");
-                }
-                CASearchAndDeleteTCPSession(&(svritem->sep.endpoint));
+                                    TLS_DATA_MAX_SIZE);
+                oc_mutex_unlock(g_mutexObjectList);
+                CATCPDisconnectSession(&peerEP.endpoint);
                 return;
             }
             nbRead = tlsLength - svritem->tlsLen;
@@ -573,21 +618,40 @@ static void CAReceiveMessage(int fd)
                                 nbRead, len, svritem->tlsLen);
             if (tlsLength > 0 && tlsLength == svritem->tlsLen)
             {
-                //when successfully read data - pass them to callback.
-                res = CAdecryptSsl(&svritem->sep, (uint8_t *)svritem->tlsdata, svritem->tlsLen);
-                svritem->tlsLen = 0;
+                // When successfully read data - pass them to callback.
+                // Dont invoke callback locking mutex
+                unsigned char *mesBuf = svritem->tlsdata;
+                size_t mesBufLen = svritem->tlsLen;
+                svritem->tlsdata = NULL;
+                oc_mutex_unlock(g_mutexObjectList);
+
+                res = CAdecryptSsl(&peerEP, (uint8_t *)mesBuf, mesBufLen);
                 OIC_LOG_V(INFO, TAG, "%s: CAdecryptSsl returned %d", __func__, res);
+
+                // Check for the svritem and reset buffer
+                oc_mutex_lock(g_mutexObjectList);
+                svritem = CAGetSessionInfoFromFDAsOwner(fd, &index);
+                if (svritem)
+                {
+                    svritem->tlsdata = mesBuf;
+                    svritem->tlsLen = 0;
+                }
+                else
+                {
+                    // svritem does not exist, thus free the message buffer
+                    OIC_LOG(ERROR, TAG, "svritem not found. Freeing message buffer!");
+                    OICFree(mesBuf);
+                }
             }
         }
 #endif
-
     }
-    else
+    else // Non-Secure connection
     {
         svritem->protocol = COAP;
 
         // svritem->tlsdata can also be used as receiving buffer in case of raw tcp
-        len = recv(fd, svritem->tlsdata, sizeof(svritem->tlsdata), 0);
+        len = recv(fd, svritem->tlsdata, TLS_DATA_MAX_SIZE, 0);
         if (len < 0)
         {
             OIC_LOG_V(ERROR, TAG, "recv failed %s", strerror(errno));
@@ -600,26 +664,40 @@ static void CAReceiveMessage(int fd)
         }
         else
         {
-            OIC_LOG_V(DEBUG, TAG, "recv() : %d bytes", len);
             //when successfully read data - pass them to callback.
+            OIC_LOG_V(DEBUG, TAG, "recv() : %d bytes", len);
             if (g_packetReceivedCallback)
             {
-                res = g_packetReceivedCallback(&svritem->sep, svritem->tlsdata, len);
+                // Dont invoke callback locking mutex
+                unsigned char *mesBuf = svritem->tlsdata;
+                svritem->tlsdata = NULL;
+                oc_mutex_unlock(g_mutexObjectList);
+
+                res = g_packetReceivedCallback(&peerEP, mesBuf, len);
+
+                // Check for the svritem and reset buffer
+                oc_mutex_lock(g_mutexObjectList);
+                svritem = CAGetSessionInfoFromFDAsOwner(fd, &index);
+                if (svritem)
+                {
+                    svritem->tlsdata = mesBuf;
+                    svritem->tlsLen = 0;
+                }
+                else
+                {
+                    // svritem does not exist, thus free the message buffer
+                    OIC_LOG(ERROR, TAG, "svritem not found. Freeing message buffer!");
+                    OICFree(mesBuf);
+                }
             }
         }
     }
 
-    //disconnect session and clean-up data if any error occurs
+    oc_mutex_unlock(g_mutexObjectList);
+
     if (res != CA_STATUS_OK)
     {
-#ifdef __WITH_TLS__
-        if (CA_STATUS_OK != CAcloseSslConnection(&svritem->sep.endpoint))
-        {
-            OIC_LOG(ERROR, TAG, "Failed to close TLS session");
-        }
-#endif
-        CASearchAndDeleteTCPSession(&(svritem->sep.endpoint));
-        return;
+        CATCPDisconnectSession(&peerEP.endpoint);
     }
 }
 
@@ -964,14 +1042,13 @@ CAResult_t CATCPStartServer(const ca_thread_pool_t threadPool)
 
     CAResult_t res = CA_STATUS_OK;
 #ifndef __TIZENRT__
-    res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, &g_recvThreadId);
+    res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, NULL);
 #else
-    res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, &g_recvThreadId,
+    res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, NULL,
                                  "IoT_TCPReceive", CONFIG_IOTIVITY_TCPRECEIVE_PTHREAD_STACKSIZE);
 #endif
     if (CA_STATUS_OK != res)
     {
-        g_recvThreadId = 0;
         oc_mutex_unlock(g_mutexObjectList);
         OIC_LOG(ERROR, TAG, "thread_pool_add_task failed");
         CATCPStopServer();
@@ -998,6 +1075,14 @@ void CATCPStopServer()
     // set terminate flag.
     caglobals.tcp.terminate = true;
 
+#ifdef __TIZENRT__
+    if (caglobals.tcp.started)
+    {
+        oc_cond_wait(g_condObjectList, g_mutexObjectList);
+        caglobals.tcp.started = false;
+    }
+#endif
+
     // close accept socket.
 #ifndef __WITH_TLS__
     CLOSE_SOCKET(ipv4);
@@ -1018,25 +1103,17 @@ void CATCPStopServer()
         caglobals.tcp.shutdownFds[1] = OC_INVALID_SOCKET;
         // receive thread will stop immediately
     }
-#endif
     if (caglobals.tcp.started)
     {
         oc_cond_wait(g_condObjectList, g_mutexObjectList);
         caglobals.tcp.started = false;
     }
-#ifndef __TIZENRT__
     if (caglobals.tcp.shutdownFds[0] != OC_INVALID_SOCKET)
     {
         close(caglobals.tcp.shutdownFds[0]);
         caglobals.tcp.shutdownFds[0] = OC_INVALID_SOCKET;
     }
 #endif
-    CAResult_t res = ca_thread_pool_remove_task(g_threadPool, g_recvThreadId);
-    if (CA_STATUS_OK != res)
-    {
-        OIC_LOG(ERROR, TAG, "ca_thread_pool_remove_task failed");
-    }
-    g_recvThreadId = 0;
     oc_mutex_unlock(g_mutexObjectList);
 
     CATCPDisconnectAll();
@@ -1066,6 +1143,7 @@ size_t CACheckPayloadLengthFromHeader(const void *data, size_t dlen)
     if (!pdu)
     {
         OIC_LOG(ERROR, TAG, "outpdu is null");
+        OIC_LOG_V(ERROR, TAG, "data length: %zu", dlen);
         return 0;
     }
 
@@ -1111,22 +1189,29 @@ static ssize_t sendData(const CAEndpoint_t *endpoint, const void *data,
 
     // #2. send data to remote device.
     ssize_t remainLen = dlen;
+    size_t sendCounter = 0;
     do
     {
 #ifdef MSG_NOSIGNAL
-        ssize_t len = send(sockFd, data, remainLen, MSG_NOSIGNAL);
+        ssize_t len = send(sockFd, data, remainLen, MSG_DONTWAIT | MSG_NOSIGNAL);
 #else
-        ssize_t len = send(sockFd, data, remainLen, 0);
+        ssize_t len = send(sockFd, data, remainLen, MSG_DONTWAIT);
 #endif
         if (-1 == len)
         {
-            if (EWOULDBLOCK != errno)
+            if (EWOULDBLOCK != errno && EAGAIN != errno)
             {
                 OIC_LOG_V(ERROR, TAG, "unicast ipv4tcp sendTo failed: %s", strerror(errno));
                 CALogSendStateInfo(endpoint->adapter, endpoint->addr, endpoint->port,
                                    len, false, strerror(errno));
                 return len;
             }
+            sendCounter++;
+            OIC_LOG_V(WARNING, TAG, "send blocked. trying %zu attempt from 25", sendCounter);
+            if(sendCounter >= 25)
+            {
+                return len;
+            }
             continue;
         }
         data += len;
@@ -1187,6 +1272,15 @@ CASocketFd_t CAConnectTCPSession(const CAEndpoint_t *endpoint)
     svritem->state = CONNECTING;
     svritem->isClient = true;
 
+    // Allocate message buffer
+    svritem->tlsdata = (unsigned char*) OICCalloc(TLS_DATA_MAX_SIZE, sizeof(unsigned char));
+    if (!svritem->tlsdata)
+    {
+        OIC_LOG(ERROR, TAG, "Out of memory");
+        OICFree(svritem);
+        return OC_INVALID_SOCKET;
+    }
+
     // #2. add TCP connection info to list
     oc_mutex_lock(g_mutexObjectList);
     if (caglobals.tcp.svrlist)
@@ -1196,6 +1290,7 @@ CASocketFd_t CAConnectTCPSession(const CAEndpoint_t *endpoint)
         {
             OIC_LOG(ERROR, TAG, "u_arraylist_add failed.");
             close(svritem->fd);
+            OICFree(svritem->tlsdata);
             OICFree(svritem);
             oc_mutex_unlock(g_mutexObjectList);
             return OC_INVALID_SOCKET;
@@ -1246,6 +1341,9 @@ CAResult_t CADisconnectTCPSession(size_t index)
     OICFree(removedData->data);
     removedData->data = NULL;
 
+    OICFree(removedData->tlsdata);
+    removedData->tlsdata = NULL;
+
     OICFree(removedData);
     removedData = NULL;
 
@@ -1260,6 +1358,8 @@ CAResult_t CADisconnectTCPSession(size_t index)
 
 void CATCPDisconnectAll()
 {
+    OIC_LOG(DEBUG, TAG, "IN - CATCPDisconnectAll");
+
     oc_mutex_lock(g_mutexObjectList);
 
     uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
@@ -1278,6 +1378,7 @@ void CATCPDisconnectAll()
     CAcloseSslConnectionAll(CA_ADAPTER_TCP);
 #endif
 
+    OIC_LOG(DEBUG, TAG, "OUT - CATCPDisconnectAll");
 }
 
 CATCPSessionInfo_t *CAGetTCPSessionInfoFromEndpoint(const CAEndpoint_t *endpoint, size_t *index)
@@ -1349,7 +1450,6 @@ CASocketFd_t CAGetSocketFDFromEndpoint(const CAEndpoint_t *endpoint)
 
 CATCPSessionInfo_t *CAGetSessionInfoFromFD(int fd, size_t *index)
 {
-    oc_mutex_lock(g_mutexObjectList);
 
     // check from the last item.
     CATCPSessionInfo_t *svritem = NULL;
@@ -1366,7 +1466,24 @@ CATCPSessionInfo_t *CAGetSessionInfoFromFD(int fd, size_t *index)
         }
     }
 
-    oc_mutex_unlock(g_mutexObjectList);
+
+    return NULL;
+}
+
+static CATCPSessionInfo_t *CAGetSessionInfoFromFDAsOwner(int fd, size_t *index)
+{
+    CATCPSessionInfo_t *svritem = NULL;
+    uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+    for (size_t i = 0; i < length; i++)
+    {
+        svritem = (CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
+
+        if (svritem && svritem->fd == fd)
+        {
+            *index = i;
+            return svritem;
+        }
+    }
 
     return NULL;
 }
@@ -1391,6 +1508,37 @@ CAResult_t CASearchAndDeleteTCPSession(const CAEndpoint_t *endpoint)
     return result;
 }
 
+void CATCPCloseInProgressConnections()
+{
+    OIC_LOG(INFO, TAG, "IN - CATCPCloseInProgressConnections");
+
+    oc_mutex_lock(g_mutexObjectList);
+
+    uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+    for (size_t index = 0; index < length; index++)
+    {
+        CATCPSessionInfo_t *svritem = (CATCPSessionInfo_t *) u_arraylist_get(
+                caglobals.tcp.svrlist, index);
+        if (!svritem)
+        {
+            continue;
+        }
+
+        // Session which are connecting state
+        if (svritem->fd >= 0 && svritem->state == CONNECTING)
+        {
+            shutdown(svritem->fd, SHUT_RDWR);
+            close(svritem->fd);
+            svritem->fd = -1;
+            svritem->state = DISCONNECTED;
+        }
+    }
+
+    oc_mutex_unlock(g_mutexObjectList);
+
+    OIC_LOG(INFO, TAG, "OUT - CATCPCloseInProgressConnections");
+}
+
 size_t CAGetTotalLengthFromHeader(const unsigned char *recvBuffer)
 {
     OIC_LOG(DEBUG, TAG, "IN - CAGetTotalLengthFromHeader");