Changed tcp adapter logic to process the received data directly
authorhyuna0213.jo <hyuna0213.jo@samsung.com>
Tue, 8 Mar 2016 22:45:51 +0000 (07:45 +0900)
committerJon A. Cruz <jon@joncruz.org>
Fri, 11 Mar 2016 08:19:34 +0000 (08:19 +0000)
Currently If new connection is created with remote device, received
first data from remote device can be processed after select timeout.
because select() is blocking function. so I added connection file
descriptor to update read file descriptor list after receiving
connection event.

Change-Id: I29f83447a6f5d814a1491e06cee886f9f95b5fd0
Signed-off-by: hyuna0213.jo <hyuna0213.jo@samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/5569
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Jon A. Cruz <jon@joncruz.org>
resource/csdk/connectivity/api/cacommon.h
resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c
resource/csdk/connectivity/src/tcp_adapter/catcpserver.c

index 3ed6002..464f24b 100644 (file)
@@ -518,6 +518,7 @@ typedef struct
         int selectTimeout;      /**< in seconds */
         int listenBacklog;      /**< backlog counts*/
         int shutdownFds[2];     /**< shutdown pipe */
+        int connectionFds[2];   /**< connection pipe */
         int maxfd;              /**< highest fd (for select) */
         bool started;           /**< the TCP adapter has started */
         bool terminate;         /**< the TCP adapter needs to stop */
index 5c4c581..22ce787 100644 (file)
@@ -240,7 +240,7 @@ CAResult_t CAInitializeTCP(CARegisterConnectivityCallback registerCallback,
     CATCPSetPacketReceiveCallback(CATCPPacketReceivedCB);
     CATCPSetErrorHandler(CATCPErrorHandler);
 
-    CAConnectivityHandler_t TCPHandler = {
+    CAConnectivityHandler_t tcpHandler = {
         .startAdapter = CAStartTCP,
         .startListenServer = CAStartTCPListeningServer,
         .stopListenServer = CAStopTCPListeningServer,
@@ -253,7 +253,7 @@ CAResult_t CAInitializeTCP(CARegisterConnectivityCallback registerCallback,
         .terminate = CATerminateTCP,
         .cType = CA_ADAPTER_TCP};
 
-    registerCallback(TCPHandler);
+    registerCallback(tcpHandler);
 
     OIC_LOG(INFO, TAG, "OUT IntializeTCP is Success");
     return CA_STATUS_OK;
index 90f6821..4fe652f 100644 (file)
@@ -186,6 +186,10 @@ static void CAFindReadyMessage()
     {
         FD_SET(caglobals.tcp.shutdownFds[0], &readFds);
     }
+    if (-1 != caglobals.tcp.connectionFds[0])
+    {
+        FD_SET(caglobals.tcp.connectionFds[0], &readFds);
+    }
 
     uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
     for (size_t i = 0; i < length; i++)
@@ -226,6 +230,21 @@ static void CASelectReturned(fd_set *readFds, int ret)
         CAAcceptConnection();
         return;
     }
+    else if (-1 != caglobals.tcp.connectionFds[0] &&
+            FD_ISSET(caglobals.tcp.connectionFds[0], readFds))
+    {
+        // new connection was created from remote device.
+        // exit the function to update read file descriptor.
+        char buf[MAX_ADDR_STR_SIZE_CA] = {0};
+        ssize_t len = read(caglobals.tcp.connectionFds[0], buf, sizeof (buf));
+        if (-1 == len)
+        {
+            return;
+        }
+        OIC_LOG_V(DEBUG, TAG, "Received new connection event with [%s]", buf);
+        FD_CLR(caglobals.tcp.connectionFds[0], readFds);
+        return;
+    }
     else
     {
         uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
@@ -361,6 +380,23 @@ static void CAReceiveMessage(int fd)
     return;
 }
 
+static void CAWakeUpForReadFdsUpdate(const char *host)
+{
+    if (caglobals.tcp.connectionFds[1] != -1)
+    {
+        ssize_t len = 0;
+        do
+        {
+            len = write(caglobals.tcp.connectionFds[1], host, strlen(host));
+        } while ((len == -1) && (errno == EINTR));
+
+        if ((len == -1) && (errno != EINTR) && (errno != EPIPE))
+        {
+            OIC_LOG_V(DEBUG, TAG, "write failed: %s", strerror(errno));
+        }
+    }
+}
+
 static int CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem)
 {
     // create tcp socket
@@ -380,29 +416,7 @@ static int CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem)
     if (0 == ret)
     {
         OIC_LOG(DEBUG, TAG, "connect socket success");
-    }
-    else if (EINPROGRESS == errno)
-    {
-        OIC_LOG(DEBUG, TAG, "EINPROGRESS");
-        int error = 0;
-        socklen_t len = sizeof(error);
-        if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
-        {
-            OIC_LOG(ERROR, TAG, "getsockopt() error");
-            goto exit;
-        }
-
-        if (error)
-        {
-            if (ECONNREFUSED == error)
-            {
-                OIC_LOG(ERROR, TAG, "connection refused");
-                goto exit;
-            }
-            OIC_LOG(ERROR, TAG, "failed to connect socket");
-            goto exit;
-        }
-        OIC_LOG(DEBUG, TAG, "connect socket success");
+        CAWakeUpForReadFdsUpdate(svritem->sep.endpoint.addr);
     }
     else
     {
@@ -467,31 +481,31 @@ exit:
     return CA_STATUS_FAILED;
 }
 
-static void CAInitializePipe()
+static void CAInitializePipe(int *fds)
 {
-    int ret = pipe(caglobals.tcp.shutdownFds);
+    int ret = pipe(fds);
     if (-1 != ret)
     {
-        ret = fcntl(caglobals.tcp.shutdownFds[0], F_GETFD);
+        ret = fcntl(fds[0], F_GETFD);
         if (-1 != ret)
         {
-            ret = fcntl(caglobals.tcp.shutdownFds[0], F_SETFD, ret|FD_CLOEXEC);
+            ret = fcntl(fds[0], F_SETFD, ret|FD_CLOEXEC);
         }
         if (-1 != ret)
         {
-            ret = fcntl(caglobals.tcp.shutdownFds[1], F_GETFD);
+            ret = fcntl(fds[1], F_GETFD);
         }
         if (-1 != ret)
         {
-            ret = fcntl(caglobals.tcp.shutdownFds[1], F_SETFD, ret|FD_CLOEXEC);
+            ret = fcntl(fds[1], F_SETFD, ret|FD_CLOEXEC);
         }
         if (-1 == ret)
         {
-            close(caglobals.tcp.shutdownFds[1]);
-            close(caglobals.tcp.shutdownFds[0]);
+            close(fds[1]);
+            close(fds[0]);
 
-            caglobals.tcp.shutdownFds[0] = -1;
-            caglobals.tcp.shutdownFds[1] = -1;
+            fds[0] = -1;
+            fds[1] = -1;
 
             OIC_LOG_V(ERROR, TAG, "pipe failed: %s", strerror(errno));
         }
@@ -536,10 +550,15 @@ CAResult_t CATCPStartServer(const ca_thread_pool_t threadPool)
     }
 
     // create pipe for fast shutdown
-    CAInitializePipe();
+    CAInitializePipe(caglobals.tcp.shutdownFds);
     CHECKFD(caglobals.tcp.shutdownFds[0]);
     CHECKFD(caglobals.tcp.shutdownFds[1]);
 
+    // create pipe for connection event
+    CAInitializePipe(caglobals.tcp.connectionFds);
+    CHECKFD(caglobals.tcp.connectionFds[0]);
+    CHECKFD(caglobals.tcp.connectionFds[1]);
+
     caglobals.tcp.terminate = false;
     res = ca_thread_pool_add_task(threadPool, CAReceiveHandler, NULL);
     if (CA_STATUS_OK != res)
@@ -567,6 +586,11 @@ void CATCPStopServer()
         // receive thread will stop immediately
     }
 
+    if (caglobals.tcp.connectionFds[1] != -1)
+    {
+        close(caglobals.tcp.connectionFds[1]);
+    }
+
     if (caglobals.tcp.started)
     {
         ca_cond_wait(g_condObjectList, g_mutexObjectList);