Replace select with poll.
[platform/upstream/iotivity.git] / resource / csdk / connectivity / src / tcp_adapter / catcpserver.c
index 7136fe0..40c076d 100644 (file)
@@ -20,7 +20,6 @@
 
 #include <sys/types.h>
 #include <sys/socket.h>
-#include <sys/select.h>
 #include <sys/ioctl.h>
 #ifdef __TIZENRT__
 #include <tinyara/config.h>
@@ -48,6 +47,7 @@
 #include "caadapterutils.h"
 #include "octhread.h"
 #include "oic_malloc.h"
+#include "oic_string.h"
 
 #ifdef __WITH_TLS__
 #include "ca_adapter_net_ssl.h"
 
 #define COAP_TCP_MAX_BUFFER_CHUNK_SIZE 65530 //64kb - 6 (coap+tcp max header size)
 
+#define MILLISECONDS_PER_SECOND   (1000)
+
 /**
  * Thread pool.
  */
 static ca_thread_pool_t g_threadPool = NULL;
 
 /**
+ * Thread task id.
+ */
+static uint32_t g_taskId = 0;
+
+/**
  * Mutex to synchronize device object list.
  */
 static oc_mutex g_mutexObjectList = NULL;
@@ -93,6 +100,16 @@ static oc_mutex g_mutexObjectList = NULL;
 static oc_cond g_condObjectList = NULL;
 
 /**
+ * Mutex to synchronize send.
+ */
+static oc_mutex g_mutexSend = NULL;
+
+/**
+ * Conditional mutex to synchronize send.
+ */
+static oc_cond g_condSend = NULL;
+
+/**
  * Maintains the callback to be notified when data received from remote device.
  */
 static CATCPPacketReceivedCallback g_packetReceivedCallback = NULL;
@@ -110,15 +127,34 @@ static CATCPConnectionHandleCallback g_connectionCallback = NULL;
 static CASocketFd_t CACreateAcceptSocket(int family, CASocket_t *sock);
 static void CAAcceptConnection(CATransportFlags_t flag, CASocket_t *sock);
 static void CAFindReadyMessage();
-static void CASelectReturned(fd_set *readFds);
+static void CAPollReturned(struct pollfd *readFds, size_t size);
 static void CAReceiveMessage(int fd);
 static void CAReceiveHandler(void *data);
 static CAResult_t CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem);
 static void CATCPInitializeSocket();
+static CATCPSessionInfo_t *CAGetSessionInfoFromFDAsOwner(int fd, size_t *index);
 
-#define CHECKFD(FD) \
-    if (FD > caglobals.tcp.maxfd) \
-        caglobals.tcp.maxfd = FD;
+#if defined(__TIZEN__)
+static char g_cloudproxyUri[CA_MAX_URI_LENGTH];
+
+CAResult_t CASetCloudAddressForProxy(const char *uri)
+{
+    if (uri == NULL)
+        memset(g_cloudproxyUri, '\0', sizeof (g_cloudproxyUri));
+    else
+        OICStrcpy(g_cloudproxyUri, sizeof (g_cloudproxyUri), uri);
+    return CA_STATUS_OK;
+}
+
+const char *CAGetCloudAddressForProxy()
+{
+    if (g_cloudproxyUri[0] == '\0')
+        return NULL;
+    return g_cloudproxyUri;
+}
+#endif
+
+#define MAX_TCP_SOCK_COUNT 4
 
 #define CLOSE_SOCKET(TYPE) \
     if (caglobals.tcp.TYPE.fd != OC_INVALID_SOCKET) \
@@ -127,11 +163,9 @@ static void CATCPInitializeSocket();
         caglobals.tcp.TYPE.fd = OC_INVALID_SOCKET; \
     }
 
-#define CA_FD_SET(TYPE, FDS) \
-    if (caglobals.tcp.TYPE.fd != OC_INVALID_SOCKET) \
-    { \
-        FD_SET(caglobals.tcp.TYPE.fd, FDS); \
-    }
+#define CA_FD_SET(TYPE, FDS, COUNT) \
+        FDS[COUNT].fd = caglobals.tcp.TYPE.fd; \
+        FDS[COUNT].events = POLLIN;
 
 void CATCPDestroyMutex()
 {
@@ -149,7 +183,7 @@ CAResult_t CATCPCreateMutex()
         g_mutexObjectList = oc_mutex_new();
         if (!g_mutexObjectList)
         {
-            OIC_LOG(ERROR, TAG, "Failed to created mutex!");
+            OIC_LOG(ERROR, TAG, "Failed to create mutex!");
             return CA_STATUS_FAILED;
         }
     }
@@ -173,7 +207,54 @@ CAResult_t CATCPCreateCond()
         g_condObjectList = oc_cond_new();
         if (!g_condObjectList)
         {
-            OIC_LOG(ERROR, TAG, "Failed to created cond!");
+            OIC_LOG(ERROR, TAG, "Failed to create cond!");
+            return CA_STATUS_FAILED;
+        }
+    }
+    return CA_STATUS_OK;
+}
+
+void CATCPDestroySendMutex()
+{
+    if (g_mutexSend)
+    {
+        oc_mutex_free(g_mutexSend);
+        g_mutexSend = NULL;
+    }
+}
+
+CAResult_t CATCPCreateSendMutex()
+{
+    if (!g_mutexSend)
+    {
+        g_mutexSend = oc_mutex_new();
+        if (!g_mutexSend)
+        {
+            OIC_LOG(ERROR, TAG, "Failed to create send mutex!");
+            return CA_STATUS_FAILED;
+        }
+    }
+
+    return CA_STATUS_OK;
+}
+
+void CATCPDestroySendCond()
+{
+    if (g_condSend)
+    {
+        oc_cond_free(g_condSend);
+        g_condSend = NULL;
+    }
+}
+
+CAResult_t CATCPCreateSendCond()
+{
+    if (!g_condSend)
+    {
+        g_condSend = oc_cond_new();
+        if (!g_condSend)
+        {
+            OIC_LOG(ERROR, TAG, "Failed to create send cond!");
             return CA_STATUS_FAILED;
         }
     }
@@ -185,8 +266,15 @@ static void CAReceiveHandler(void *data)
     (void)data;
     OIC_LOG(DEBUG, TAG, "IN - CAReceiveHandler");
 
-    while (!caglobals.tcp.terminate)
+    while (true)
     {
+        oc_mutex_lock(g_mutexObjectList);
+        if (caglobals.tcp.terminate)
+        {
+            oc_mutex_unlock(g_mutexObjectList);
+            break;
+        }
+        oc_mutex_unlock(g_mutexObjectList);
         CAFindReadyMessage();
     }
 
@@ -199,108 +287,158 @@ static void CAReceiveHandler(void *data)
 
 static void CAFindReadyMessage()
 {
-    fd_set readFds;
-    struct timeval timeout = { .tv_sec = caglobals.tcp.selectTimeout };
-
-    FD_ZERO(&readFds);
-    CA_FD_SET(ipv4, &readFds);
-    CA_FD_SET(ipv4s, &readFds);
-    CA_FD_SET(ipv6, &readFds);
-    CA_FD_SET(ipv6s, &readFds);
-#ifndef __TIZENRT__
-    if (OC_INVALID_SOCKET != caglobals.tcp.shutdownFds[0])
-    {
-        FD_SET(caglobals.tcp.shutdownFds[0], &readFds);
-    }
-#endif
-    if (OC_INVALID_SOCKET != caglobals.tcp.connectionFds[0])
+    int timeout = (caglobals.tcp.selectTimeout * 1000);
+    size_t counter = 0;
+
+    oc_mutex_lock(g_mutexObjectList);
+    uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+
+    // Consider 4 tcp sockets(ipv4, ipv4s, ipv6, ipv6s) + 1 connection fd + all sockets in svrlist
+    struct pollfd *readFds = (struct pollfd *)OICCalloc(MAX_TCP_SOCK_COUNT + 1 + length, sizeof(struct pollfd));
+    if (NULL == readFds)
     {
-        FD_SET(caglobals.tcp.connectionFds[0], &readFds);
+        OIC_LOG_V(ERROR, TAG, "Failed to allocate memory!");
+        oc_mutex_unlock(g_mutexObjectList);
+        return;
     }
 
-    uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+    // 4 tcp sockets
+    CA_FD_SET(ipv4, readFds, counter);
+    counter++;
+    CA_FD_SET(ipv4s, readFds, counter);
+    counter++;
+    CA_FD_SET(ipv6, readFds, counter);
+    counter++;
+    CA_FD_SET(ipv6s, readFds, counter);
+    counter++;
+
+    // 1 connection fd
+    readFds[counter].fd = caglobals.tcp.connectionFds[0];
+    readFds[counter].events = POLLIN;
+    counter++;
+
+    // All sockets in svrlist
     for (size_t i = 0; i < length; i++)
     {
         CATCPSessionInfo_t *svritem =
                 (CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
         if (svritem && 0 <= svritem->fd && CONNECTED == svritem->state)
         {
-            FD_SET(svritem->fd, &readFds);
+            readFds[counter].fd = svritem->fd;
+            readFds[counter].events = POLLIN;
+            counter++;
         }
     }
+    oc_mutex_unlock(g_mutexObjectList);
 
-    int ret = select(caglobals.tcp.maxfd + 1, &readFds, NULL, NULL, &timeout);
+    int ret = poll(readFds, counter, timeout);
 
+    oc_mutex_lock(g_mutexObjectList);
     if (caglobals.tcp.terminate)
     {
         OIC_LOG_V(INFO, TAG, "Packet receiver Stop request received.");
+        oc_mutex_unlock(g_mutexObjectList);
+        OICFree(readFds);
         return;
     }
-    if (0 >= ret)
+    oc_mutex_unlock(g_mutexObjectList);
+
+    if (ret > 0)
     {
-        if (0 > ret)
-        {
-            OIC_LOG_V(FATAL, TAG, "select error %s", strerror(errno));
-        }
-        return;
+        CAPollReturned(readFds, counter);
+    }
+    else if (ret < 0)
+    {
+        OIC_LOG_V(FATAL, TAG, "poll error %s", strerror(errno));
     }
 
-    CASelectReturned(&readFds);
+    OICFree(readFds);
 }
 
-static void CASelectReturned(fd_set *readFds)
+static void CAPollReturned(struct pollfd *readFds, size_t size)
 {
     VERIFY_NON_NULL_VOID(readFds, TAG, "readFds is NULL");
 
-    if (caglobals.tcp.ipv4.fd != -1 && FD_ISSET(caglobals.tcp.ipv4.fd, readFds))
+    if (caglobals.tcp.ipv4.fd != -1 && readFds[0].revents == POLLIN)
     {
         CAAcceptConnection(CA_IPV4, &caglobals.tcp.ipv4);
         return;
     }
-    else if (caglobals.tcp.ipv4s.fd != -1 && FD_ISSET(caglobals.tcp.ipv4s.fd, readFds))
+    else if (caglobals.tcp.ipv4s.fd != -1 && readFds[1].revents == POLLIN)
     {
         CAAcceptConnection(CA_IPV4 | CA_SECURE, &caglobals.tcp.ipv4s);
         return;
     }
-    else if (caglobals.tcp.ipv6.fd != -1 && FD_ISSET(caglobals.tcp.ipv6.fd, readFds))
+    else if (caglobals.tcp.ipv6.fd != -1 && readFds[2].revents == POLLIN)
     {
         CAAcceptConnection(CA_IPV6, &caglobals.tcp.ipv6);
         return;
     }
-    else if (caglobals.tcp.ipv6s.fd != -1 && FD_ISSET(caglobals.tcp.ipv6s.fd, readFds))
+    else if (caglobals.tcp.ipv6s.fd != -1 && readFds[3].revents == POLLIN)
     {
         CAAcceptConnection(CA_IPV6 | CA_SECURE, &caglobals.tcp.ipv6s);
         return;
     }
-    else if (-1 != caglobals.tcp.connectionFds[0] &&
-            FD_ISSET(caglobals.tcp.connectionFds[0], readFds))
+    else if (-1 != caglobals.tcp.connectionFds[0] && readFds[4].revents != 0)
     {
-        // new connection was created from remote device.
-        // exit the function to update read file descriptor.
-        char buf[MAX_ADDR_STR_SIZE_CA] = {0};
-        ssize_t len = read(caglobals.tcp.connectionFds[0], buf, sizeof (buf));
-        if (-1 == len)
-        {
+            // new connection was created from remote device.
+            // exit the function to update read file descriptor.
+            char buf[MAX_ADDR_STR_SIZE_CA] = {0};
+            ssize_t len = read(caglobals.tcp.connectionFds[0], buf, sizeof (buf));
+            if (-1 == len)
+            {
+                return;
+            }
+            OIC_LOG_V(DEBUG, TAG, "Received new connection event with [%s]", buf);
             return;
-        }
-        OIC_LOG_V(DEBUG, TAG, "Received new connection event with [%s]", buf);
-        return;
     }
     else
     {
+        int *readFDList = NULL;
+        size_t readFDListSize = 0;
+
+        oc_mutex_lock(g_mutexObjectList);
         uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+
+        readFDList = (int*) OICCalloc(length, sizeof(int));
+        if (NULL == readFDList)
+        {
+            OIC_LOG_V(ERROR, TAG, "Failed to allocate memory!");
+            oc_mutex_unlock(g_mutexObjectList);
+            return;
+        }
+
         for (size_t i = 0; i < length; i++)
         {
             CATCPSessionInfo_t *svritem =
                     (CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
             if (svritem && svritem->fd >= 0)
             {
-                if (FD_ISSET(svritem->fd, readFds))
+                size_t j = 0;
+                while (j < size)
                 {
-                    CAReceiveMessage(svritem->fd);
+                    if (svritem->fd == readFds[j].fd)
+                    {
+                        break;
+                    }
+                    j++;
+                }
+
+                if (j < size  && readFds[j].revents != 0)
+                {
+                    readFDList[readFDListSize++] = svritem->fd;
                 }
             }
         }
+        oc_mutex_unlock(g_mutexObjectList);
+
+        // Read incomming messages from fds
+        for (size_t i = 0; i < readFDListSize; i++)
+        {
+            CAReceiveMessage(readFDList[i]);
+        }
+
+        OICFree(readFDList);
     }
 }
 
@@ -344,20 +482,29 @@ static void CAAcceptConnection(CATransportFlags_t flag, CASocket_t *sock)
         CAConvertAddrToName((struct sockaddr_storage *)&clientaddr, clientlen,
                             svritem->sep.endpoint.addr, &svritem->sep.endpoint.port);
 
+        // Allocate message buffer
+        svritem->tlsdata = (unsigned char*) OICCalloc(TLS_DATA_MAX_SIZE, sizeof(unsigned char));
+        if (!svritem->tlsdata)
+        {
+            OIC_LOG(ERROR, TAG, "Out of memory");
+            close(sockfd);
+            OICFree(svritem);
+            return;
+        }
+
         oc_mutex_lock(g_mutexObjectList);
         bool result = u_arraylist_add(caglobals.tcp.svrlist, svritem);
         if (!result)
         {
             OIC_LOG(ERROR, TAG, "u_arraylist_add failed.");
             close(sockfd);
+            OICFree(svritem->tlsdata);
             OICFree(svritem);
             oc_mutex_unlock(g_mutexObjectList);
             return;
         }
         oc_mutex_unlock(g_mutexObjectList);
 
-        CHECKFD(sockfd);
-
         // pass the connection information to CA Common Layer.
         if (g_connectionCallback)
         {
@@ -409,7 +556,7 @@ CAResult_t CAConstructCoAP(CATCPSessionInfo_t *svritem, unsigned char **data,
 
     unsigned char *inBuffer = *data;
     size_t inLen = *dataLength;
-    OIC_LOG_V(DEBUG, TAG, "before-datalength : %u", *dataLength);
+    OIC_LOG_V(DEBUG, TAG, "before-datalength : %zd", *dataLength);
 
     if (NULL == svritem->data && inLen > 0)
     {
@@ -513,7 +660,7 @@ CAResult_t CAConstructCoAP(CATCPSessionInfo_t *svritem, unsigned char **data,
     *data = inBuffer;
     *dataLength = inLen;
 
-    OIC_LOG_V(DEBUG, TAG, "after-datalength : %u", *dataLength);
+    OIC_LOG_V(DEBUG, TAG, "after-datalength : %zd", *dataLength);
     OIC_LOG_V(DEBUG, TAG, "Out %s", __func__);
     return CA_STATUS_OK;
 }
@@ -522,19 +669,21 @@ static void CAReceiveMessage(int fd)
 {
     CAResult_t res = CA_STATUS_OK;
 
+    oc_mutex_lock(g_mutexObjectList);
+
     //get remote device information from file descriptor.
     size_t index = 0;
-    CATCPSessionInfo_t *svritem = CAGetSessionInfoFromFD(fd, &index);
+    CATCPSessionInfo_t *svritem = CAGetSessionInfoFromFDAsOwner(fd, &index);
     if (!svritem)
     {
         OIC_LOG(ERROR, TAG, "there is no connection information in list");
+        oc_mutex_unlock(g_mutexObjectList);
         return;
     }
 
-    // read data
+    CASecureEndpoint_t peerEP = svritem->sep;
     int len = 0;
-
-    if (svritem->sep.endpoint.flags & CA_SECURE)
+    if (svritem->sep.endpoint.flags & CA_SECURE) // Secure connection
     {
         svritem->protocol = TLS;
 
@@ -551,16 +700,13 @@ static void CAReceiveMessage(int fd)
             //[3][4] bytes in tls header are tls payload length
             tlsLength = TLS_HEADER_SIZE +
                             (size_t)((svritem->tlsdata[3] << 8) | svritem->tlsdata[4]);
-            OIC_LOG_V(DEBUG, TAG, "total tls length = %u", tlsLength);
-            if (tlsLength > sizeof(svritem->tlsdata))
+            OIC_LOG_V(DEBUG, TAG, "total tls length = %zd", tlsLength);
+            if (tlsLength > TLS_DATA_MAX_SIZE)
             {
                 OIC_LOG_V(ERROR, TAG, "total tls length is too big (buffer size : %u)",
-                                    sizeof(svritem->tlsdata));
-                if (CA_STATUS_OK != CAcloseSslConnection(&svritem->sep.endpoint))
-                {
-                    OIC_LOG(ERROR, TAG, "Failed to close TLS session");
-                }
-                CASearchAndDeleteTCPSession(&(svritem->sep.endpoint));
+                                    TLS_DATA_MAX_SIZE);
+                oc_mutex_unlock(g_mutexObjectList);
+                CATCPDisconnectSession(&peerEP.endpoint);
                 return;
             }
             nbRead = tlsLength - svritem->tlsLen;
@@ -581,25 +727,44 @@ static void CAReceiveMessage(int fd)
         else
         {
             svritem->tlsLen += len;
-            OIC_LOG_V(DEBUG, TAG, "nb_read : %u bytes , recv() : %d bytes, svritem->tlsLen : %u bytes",
+            OIC_LOG_V(DEBUG, TAG, "nb_read : %zd bytes , recv() : %d bytes, svritem->tlsLen : %zd bytes",
                                 nbRead, len, svritem->tlsLen);
             if (tlsLength > 0 && tlsLength == svritem->tlsLen)
             {
-                //when successfully read data - pass them to callback.
-                res = CAdecryptSsl(&svritem->sep, (uint8_t *)svritem->tlsdata, svritem->tlsLen);
-                svritem->tlsLen = 0;
+                // When successfully read data - pass them to callback.
+                // Dont invoke callback locking mutex
+                unsigned char *mesBuf = svritem->tlsdata;
+                size_t mesBufLen = svritem->tlsLen;
+                svritem->tlsdata = NULL;
+                oc_mutex_unlock(g_mutexObjectList);
+
+                res = CAdecryptSsl(&peerEP, (uint8_t *)mesBuf, mesBufLen);
                 OIC_LOG_V(INFO, TAG, "%s: CAdecryptSsl returned %d", __func__, res);
+
+                // Check for the svritem and reset buffer
+                oc_mutex_lock(g_mutexObjectList);
+                svritem = CAGetSessionInfoFromFDAsOwner(fd, &index);
+                if (svritem)
+                {
+                    svritem->tlsdata = mesBuf;
+                    svritem->tlsLen = 0;
+                }
+                else
+                {
+                    // svritem does not exist, thus free the message buffer
+                    OIC_LOG(ERROR, TAG, "svritem not found. Freeing message buffer!");
+                    OICFree(mesBuf);
+                }
             }
         }
 #endif
-
     }
-    else
+    else // Non-Secure connection
     {
         svritem->protocol = COAP;
 
         // svritem->tlsdata can also be used as receiving buffer in case of raw tcp
-        len = recv(fd, svritem->tlsdata, sizeof(svritem->tlsdata), 0);
+        len = recv(fd, svritem->tlsdata, TLS_DATA_MAX_SIZE, 0);
         if (len < 0)
         {
             OIC_LOG_V(ERROR, TAG, "recv failed %s", strerror(errno));
@@ -612,25 +777,40 @@ static void CAReceiveMessage(int fd)
         }
         else
         {
-            OIC_LOG_V(DEBUG, TAG, "recv() : %d bytes", len);
             //when successfully read data - pass them to callback.
+            OIC_LOG_V(DEBUG, TAG, "recv() : %d bytes", len);
             if (g_packetReceivedCallback)
             {
-                res = g_packetReceivedCallback(&svritem->sep, svritem->tlsdata, len);
+                // Dont invoke callback locking mutex
+                unsigned char *mesBuf = svritem->tlsdata;
+                svritem->tlsdata = NULL;
+                oc_mutex_unlock(g_mutexObjectList);
+
+                res = g_packetReceivedCallback(&peerEP, mesBuf, len);
+
+                // Check for the svritem and reset buffer
+                oc_mutex_lock(g_mutexObjectList);
+                svritem = CAGetSessionInfoFromFDAsOwner(fd, &index);
+                if (svritem)
+                {
+                    svritem->tlsdata = mesBuf;
+                    svritem->tlsLen = 0;
+                }
+                else
+                {
+                    // svritem does not exist, thus free the message buffer
+                    OIC_LOG(ERROR, TAG, "svritem not found. Freeing message buffer!");
+                    OICFree(mesBuf);
+                }
             }
         }
     }
 
-    //disconnect session and clean-up data if any error occurs
+    oc_mutex_unlock(g_mutexObjectList);
+
     if (res != CA_STATUS_OK)
     {
-        if (CA_STATUS_OK != CATCPDisconnectSession(&svritem->sep.endpoint))
-        {
-            CASearchAndDeleteTCPSession(&svritem->sep.endpoint);
-            OIC_LOG(ERROR, TAG, "Failed to disconnect TCP session");
-        }
-
-        return;
+        CATCPDisconnectSession(&peerEP.endpoint);
     }
 }
 
@@ -692,6 +872,21 @@ static CAResult_t CATCPConvertNameToAddr(int family, const char *host, uint16_t
     return CA_STATUS_OK;
 }
 
+#if defined(__TIZEN__)
+static int CAGetHTTPStatusCode(char * response) {
+    char *resp, *code_plus, *ptrSave;
+    int ret = -1;
+
+    resp = strdup(response);
+    strtok_r(resp, " ", &ptrSave);  /* skip HTTP version */
+    code_plus = strtok_r(NULL, " ", &ptrSave);
+
+    ret = code_plus ? atoi(code_plus) : -1;
+    free(resp);
+    return ret;
+}
+#endif
+
 static CAResult_t CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem)
 {
     VERIFY_NON_NULL(svritem, TAG, "svritem is NULL");
@@ -745,13 +940,60 @@ static CAResult_t CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem)
 
     OIC_LOG(INFO, TAG, "connect socket success");
     svritem->state = CONNECTED;
-    CHECKFD(svritem->fd);
     ssize_t len = CAWakeUpForReadFdsUpdate(svritem->sep.endpoint.addr);
     if (-1 == len)
     {
         OIC_LOG(ERROR, TAG, "wakeup receive thread failed");
         return CA_SOCKET_OPERATION_FAILED;
     }
+
+#if defined(__TIZEN__)
+    // #5. Send HTTP CONNECT to proxy if proxy
+
+    const char *cloud_address = CAGetCloudAddressForProxy();
+    OIC_LOG_V(INFO, TAG, "Proxy : '%s'", cloud_address ? cloud_address : "(nil)");
+
+    if(cloud_address && *cloud_address)
+    {
+        char message[4096];
+        int len = sprintf(message,
+                "CONNECT %s HTTP/1.1\r\n"
+                "Host: %s\r\n\r\n", cloud_address, cloud_address
+        );
+
+        ssize_t l = send(fd, message, len, 0);
+        if(l != len)
+       {
+            OIC_LOG_V(ERROR, TAG, "failed to send HTTP CONNECT data (expected %d bytes, ret %zd)", len, l);
+            close(fd);
+            svritem->fd = -1;
+            return CA_SOCKET_OPERATION_FAILED;
+        }
+
+        // maybe this should be called in other thread, it causes bottleneck.
+        OIC_LOG_V(INFO, TAG, "Message sent is : '%s'\n", message);
+
+        *message = '\0';
+        OIC_LOG_V(INFO, TAG, "Receiving response to CONNECT from proxy...");
+
+        l = recv(fd, message, 4096, 0);
+
+        OIC_LOG_V(INFO, TAG, "Received data : '%s'", message);
+        OIC_LOG_V(INFO, TAG, "Received len = %zd", l);
+
+        int status_code = CAGetHTTPStatusCode(message);
+
+        OIC_LOG_V(INFO, TAG, "HTTP status_code : %d", status_code);
+        if(status_code < 200 || status_code > 299)
+       {
+            OIC_LOG_V(ERROR, TAG, "Error, Wrong status code: %d", status_code);
+            close(fd);
+            svritem->fd = -1;
+            return CA_SOCKET_OPERATION_FAILED;
+        }
+    }
+#endif
+
     return CA_STATUS_OK;
 }
 
@@ -893,6 +1135,7 @@ static void CAInitializePipe(int *fds)
 #endif
 }
 
+#ifndef DISABLE_TCP_SERVER
 #define NEWSOCKET(FAMILY, NAME) \
     caglobals.tcp.NAME.fd = CACreateAcceptSocket(FAMILY, &caglobals.tcp.NAME); \
     if (caglobals.tcp.NAME.fd == -1) \
@@ -900,7 +1143,6 @@ static void CAInitializePipe(int *fds)
         caglobals.tcp.NAME.port = 0; \
         caglobals.tcp.NAME.fd = CACreateAcceptSocket(FAMILY, &caglobals.tcp.NAME); \
     } \
-    CHECKFD(caglobals.tcp.NAME.fd);
 
 void CATCPInitializeSocket()
 {
@@ -930,6 +1172,7 @@ void CATCPInitializeSocket()
                   caglobals.tcp.ipv6s.fd, caglobals.tcp.ipv6s.port);
 #endif
 }
+#endif // DISABLE_TCP_SERVER
 
 CAResult_t CATCPStartServer(const ca_thread_pool_t threadPool)
 {
@@ -958,24 +1201,23 @@ CAResult_t CATCPStartServer(const ca_thread_pool_t threadPool)
         caglobals.tcp.svrlist = u_arraylist_create();
     }
 
+#ifndef DISABLE_TCP_SERVER
     if (caglobals.server)
     {
         CATCPInitializeSocket();
     }
+#endif
+
 #ifndef __TIZENRT__
     // create pipe for fast shutdown
     CAInitializePipe(caglobals.tcp.shutdownFds);
-    CHECKFD(caglobals.tcp.shutdownFds[0]);
-    CHECKFD(caglobals.tcp.shutdownFds[1]);
 #endif
     // create pipe for connection event
     CAInitializePipe(caglobals.tcp.connectionFds);
-    CHECKFD(caglobals.tcp.connectionFds[0]);
-    CHECKFD(caglobals.tcp.connectionFds[1]);
 
     CAResult_t res = CA_STATUS_OK;
 #ifndef __TIZENRT__
-    res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, NULL);
+    res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, &g_taskId);
 #else
     res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, NULL,
                                  "IoT_TCPReceive", CONFIG_IOTIVITY_TCPRECEIVE_PTHREAD_STACKSIZE);
@@ -1008,6 +1250,10 @@ void CATCPStopServer()
     // set terminate flag.
     caglobals.tcp.terminate = true;
 
+    oc_mutex_lock(g_mutexSend);
+    oc_cond_signal(g_condSend);
+    oc_mutex_unlock(g_mutexSend);
+
 #ifdef __TIZENRT__
     if (caglobals.tcp.started)
     {
@@ -1025,10 +1271,16 @@ void CATCPStopServer()
     CLOSE_SOCKET(ipv6s);
 #endif
 
-    close(caglobals.tcp.connectionFds[1]);
-    close(caglobals.tcp.connectionFds[0]);
-    caglobals.tcp.connectionFds[1] = OC_INVALID_SOCKET;
-    caglobals.tcp.connectionFds[0] = OC_INVALID_SOCKET;
+    if (caglobals.tcp.connectionFds[1] != OC_INVALID_SOCKET)
+    {
+       close(caglobals.tcp.connectionFds[1]);
+       caglobals.tcp.connectionFds[1] = OC_INVALID_SOCKET;
+    }
+    if (caglobals.tcp.connectionFds[0] != OC_INVALID_SOCKET)
+    {
+       close(caglobals.tcp.connectionFds[0]);
+       caglobals.tcp.connectionFds[0] = OC_INVALID_SOCKET;
+    }
 #ifndef __TIZENRT__
     if (caglobals.tcp.shutdownFds[1] != OC_INVALID_SOCKET)
     {
@@ -1049,9 +1301,12 @@ void CATCPStopServer()
 #endif
     oc_mutex_unlock(g_mutexObjectList);
 
+#ifndef __TIZENRT__
+    ca_thread_pool_remove_task(g_threadPool, g_taskId);
+#endif
+
     CATCPDisconnectAll();
     sleep(1);
-
     OIC_LOG(INFO, TAG, "Adapter terminated successfully");
 }
 
@@ -1122,7 +1377,7 @@ static ssize_t sendData(const CAEndpoint_t *endpoint, const void *data,
 
     // #2. send data to remote device.
     ssize_t remainLen = dlen;
-    size_t sendCounter = 0;
+    unsigned int sendRetryTime = 1;
     do
     {
 #ifdef MSG_NOSIGNAL
@@ -1139,14 +1394,33 @@ static ssize_t sendData(const CAEndpoint_t *endpoint, const void *data,
                                    len, false, strerror(errno));
                 return len;
             }
-            sendCounter++;
-            OIC_LOG_V(WARNING, TAG, "send blocked. trying %zu attempt from 25", sendCounter);
-            if(sendCounter >= 25)
+
+            // re-trying send after 10, 20, 40, 80, 160 and 320 milliseconds
+            if (sendRetryTime > 32)
             {
                 return len;
             }
+
+            unsigned int waitTime = sendRetryTime * 10 * MILLISECONDS_PER_SECOND;
+            OIC_LOG_V(WARNING, TAG, "send blocked. trying send after %u microseconds", waitTime);
+
+            oc_mutex_lock(g_mutexSend);
+            oc_cond_wait_for(g_condSend, g_mutexSend, waitTime);
+            oc_mutex_unlock(g_mutexSend);
+
+            oc_mutex_lock(g_mutexObjectList);
+            if (caglobals.tcp.terminate)
+            {
+                oc_mutex_unlock(g_mutexObjectList);
+                return len;
+            }
+            oc_mutex_unlock(g_mutexObjectList);
+
+            sendRetryTime = (sendRetryTime << 1);
+
             continue;
         }
+        sendRetryTime = 1;
         data += len;
         remainLen -= len;
     } while (remainLen > 0);
@@ -1205,6 +1479,15 @@ CASocketFd_t CAConnectTCPSession(const CAEndpoint_t *endpoint)
     svritem->state = CONNECTING;
     svritem->isClient = true;
 
+    // Allocate message buffer
+    svritem->tlsdata = (unsigned char*) OICCalloc(TLS_DATA_MAX_SIZE, sizeof(unsigned char));
+    if (!svritem->tlsdata)
+    {
+        OIC_LOG(ERROR, TAG, "Out of memory");
+        OICFree(svritem);
+        return OC_INVALID_SOCKET;
+    }
+
     // #2. add TCP connection info to list
     oc_mutex_lock(g_mutexObjectList);
     if (caglobals.tcp.svrlist)
@@ -1214,6 +1497,7 @@ CASocketFd_t CAConnectTCPSession(const CAEndpoint_t *endpoint)
         {
             OIC_LOG(ERROR, TAG, "u_arraylist_add failed.");
             close(svritem->fd);
+            OICFree(svritem->tlsdata);
             OICFree(svritem);
             oc_mutex_unlock(g_mutexObjectList);
             return OC_INVALID_SOCKET;
@@ -1264,15 +1548,21 @@ CAResult_t CADisconnectTCPSession(size_t index)
     OICFree(removedData->data);
     removedData->data = NULL;
 
+    OICFree(removedData->tlsdata);
+    removedData->tlsdata = NULL;
+
     OICFree(removedData);
     removedData = NULL;
 
     OIC_LOG(DEBUG, TAG, "data is removed from session list");
 
+#ifndef DISABLE_TCP_SERVER
     if (caglobals.server && MAX_CONNECTION_COUNTS == u_arraylist_length(caglobals.tcp.svrlist) + 1)
     {
         CATCPInitializeSocket();
     }
+#endif
+
     return CA_STATUS_OK;
 }
 
@@ -1370,7 +1660,6 @@ CASocketFd_t CAGetSocketFDFromEndpoint(const CAEndpoint_t *endpoint)
 
 CATCPSessionInfo_t *CAGetSessionInfoFromFD(int fd, size_t *index)
 {
-    oc_mutex_lock(g_mutexObjectList);
 
     // check from the last item.
     CATCPSessionInfo_t *svritem = NULL;
@@ -1387,7 +1676,24 @@ CATCPSessionInfo_t *CAGetSessionInfoFromFD(int fd, size_t *index)
         }
     }
 
-    oc_mutex_unlock(g_mutexObjectList);
+
+    return NULL;
+}
+
+static CATCPSessionInfo_t *CAGetSessionInfoFromFDAsOwner(int fd, size_t *index)
+{
+    CATCPSessionInfo_t *svritem = NULL;
+    uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+    for (size_t i = 0; i < length; i++)
+    {
+        svritem = (CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
+
+        if (svritem && svritem->fd == fd)
+        {
+            *index = i;
+            return svritem;
+        }
+    }
 
     return NULL;
 }