From 9143d3d12d93f571913433b732682f03a7c851e2 Mon Sep 17 00:00:00 2001 From: "hyuna0213.jo" Date: Wed, 9 Mar 2016 07:45:51 +0900 Subject: [PATCH] Changed tcp adapter logic to process the received data directly Currently If new connection is created with remote device, received first data from remote device can be processed after select timeout. because select() is blocking function. so I added connection file descriptor to update read file descriptor list after receiving connection event. Change-Id: I29f83447a6f5d814a1491e06cee886f9f95b5fd0 Signed-off-by: hyuna0213.jo Reviewed-on: https://gerrit.iotivity.org/gerrit/5569 Tested-by: jenkins-iotivity Reviewed-by: Jon A. Cruz --- resource/csdk/connectivity/api/cacommon.h | 1 + .../connectivity/src/tcp_adapter/catcpadapter.c | 4 +- .../connectivity/src/tcp_adapter/catcpserver.c | 92 ++++++++++++++-------- 3 files changed, 61 insertions(+), 36 deletions(-) diff --git a/resource/csdk/connectivity/api/cacommon.h b/resource/csdk/connectivity/api/cacommon.h index 3ed6002..464f24b 100644 --- a/resource/csdk/connectivity/api/cacommon.h +++ b/resource/csdk/connectivity/api/cacommon.h @@ -518,6 +518,7 @@ typedef struct int selectTimeout; /**< in seconds */ int listenBacklog; /**< backlog counts*/ int shutdownFds[2]; /**< shutdown pipe */ + int connectionFds[2]; /**< connection pipe */ int maxfd; /**< highest fd (for select) */ bool started; /**< the TCP adapter has started */ bool terminate; /**< the TCP adapter needs to stop */ diff --git a/resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c b/resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c index 5c4c581..22ce787 100644 --- a/resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c +++ b/resource/csdk/connectivity/src/tcp_adapter/catcpadapter.c @@ -240,7 +240,7 @@ CAResult_t CAInitializeTCP(CARegisterConnectivityCallback registerCallback, CATCPSetPacketReceiveCallback(CATCPPacketReceivedCB); CATCPSetErrorHandler(CATCPErrorHandler); - CAConnectivityHandler_t TCPHandler = { + CAConnectivityHandler_t tcpHandler = { .startAdapter = CAStartTCP, .startListenServer = CAStartTCPListeningServer, .stopListenServer = CAStopTCPListeningServer, @@ -253,7 +253,7 @@ CAResult_t CAInitializeTCP(CARegisterConnectivityCallback registerCallback, .terminate = CATerminateTCP, .cType = CA_ADAPTER_TCP}; - registerCallback(TCPHandler); + registerCallback(tcpHandler); OIC_LOG(INFO, TAG, "OUT IntializeTCP is Success"); return CA_STATUS_OK; diff --git a/resource/csdk/connectivity/src/tcp_adapter/catcpserver.c b/resource/csdk/connectivity/src/tcp_adapter/catcpserver.c index 90f6821..4fe652f 100644 --- a/resource/csdk/connectivity/src/tcp_adapter/catcpserver.c +++ b/resource/csdk/connectivity/src/tcp_adapter/catcpserver.c @@ -186,6 +186,10 @@ static void CAFindReadyMessage() { FD_SET(caglobals.tcp.shutdownFds[0], &readFds); } + if (-1 != caglobals.tcp.connectionFds[0]) + { + FD_SET(caglobals.tcp.connectionFds[0], &readFds); + } uint32_t length = u_arraylist_length(caglobals.tcp.svrlist); for (size_t i = 0; i < length; i++) @@ -226,6 +230,21 @@ static void CASelectReturned(fd_set *readFds, int ret) CAAcceptConnection(); return; } + else if (-1 != caglobals.tcp.connectionFds[0] && + FD_ISSET(caglobals.tcp.connectionFds[0], readFds)) + { + // new connection was created from remote device. + // exit the function to update read file descriptor. + char buf[MAX_ADDR_STR_SIZE_CA] = {0}; + ssize_t len = read(caglobals.tcp.connectionFds[0], buf, sizeof (buf)); + if (-1 == len) + { + return; + } + OIC_LOG_V(DEBUG, TAG, "Received new connection event with [%s]", buf); + FD_CLR(caglobals.tcp.connectionFds[0], readFds); + return; + } else { uint32_t length = u_arraylist_length(caglobals.tcp.svrlist); @@ -361,6 +380,23 @@ static void CAReceiveMessage(int fd) return; } +static void CAWakeUpForReadFdsUpdate(const char *host) +{ + if (caglobals.tcp.connectionFds[1] != -1) + { + ssize_t len = 0; + do + { + len = write(caglobals.tcp.connectionFds[1], host, strlen(host)); + } while ((len == -1) && (errno == EINTR)); + + if ((len == -1) && (errno != EINTR) && (errno != EPIPE)) + { + OIC_LOG_V(DEBUG, TAG, "write failed: %s", strerror(errno)); + } + } +} + static int CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem) { // create tcp socket @@ -380,29 +416,7 @@ static int CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem) if (0 == ret) { OIC_LOG(DEBUG, TAG, "connect socket success"); - } - else if (EINPROGRESS == errno) - { - OIC_LOG(DEBUG, TAG, "EINPROGRESS"); - int error = 0; - socklen_t len = sizeof(error); - if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0) - { - OIC_LOG(ERROR, TAG, "getsockopt() error"); - goto exit; - } - - if (error) - { - if (ECONNREFUSED == error) - { - OIC_LOG(ERROR, TAG, "connection refused"); - goto exit; - } - OIC_LOG(ERROR, TAG, "failed to connect socket"); - goto exit; - } - OIC_LOG(DEBUG, TAG, "connect socket success"); + CAWakeUpForReadFdsUpdate(svritem->sep.endpoint.addr); } else { @@ -467,31 +481,31 @@ exit: return CA_STATUS_FAILED; } -static void CAInitializePipe() +static void CAInitializePipe(int *fds) { - int ret = pipe(caglobals.tcp.shutdownFds); + int ret = pipe(fds); if (-1 != ret) { - ret = fcntl(caglobals.tcp.shutdownFds[0], F_GETFD); + ret = fcntl(fds[0], F_GETFD); if (-1 != ret) { - ret = fcntl(caglobals.tcp.shutdownFds[0], F_SETFD, ret|FD_CLOEXEC); + ret = fcntl(fds[0], F_SETFD, ret|FD_CLOEXEC); } if (-1 != ret) { - ret = fcntl(caglobals.tcp.shutdownFds[1], F_GETFD); + ret = fcntl(fds[1], F_GETFD); } if (-1 != ret) { - ret = fcntl(caglobals.tcp.shutdownFds[1], F_SETFD, ret|FD_CLOEXEC); + ret = fcntl(fds[1], F_SETFD, ret|FD_CLOEXEC); } if (-1 == ret) { - close(caglobals.tcp.shutdownFds[1]); - close(caglobals.tcp.shutdownFds[0]); + close(fds[1]); + close(fds[0]); - caglobals.tcp.shutdownFds[0] = -1; - caglobals.tcp.shutdownFds[1] = -1; + fds[0] = -1; + fds[1] = -1; OIC_LOG_V(ERROR, TAG, "pipe failed: %s", strerror(errno)); } @@ -536,10 +550,15 @@ CAResult_t CATCPStartServer(const ca_thread_pool_t threadPool) } // create pipe for fast shutdown - CAInitializePipe(); + CAInitializePipe(caglobals.tcp.shutdownFds); CHECKFD(caglobals.tcp.shutdownFds[0]); CHECKFD(caglobals.tcp.shutdownFds[1]); + // create pipe for connection event + CAInitializePipe(caglobals.tcp.connectionFds); + CHECKFD(caglobals.tcp.connectionFds[0]); + CHECKFD(caglobals.tcp.connectionFds[1]); + caglobals.tcp.terminate = false; res = ca_thread_pool_add_task(threadPool, CAReceiveHandler, NULL); if (CA_STATUS_OK != res) @@ -567,6 +586,11 @@ void CATCPStopServer() // receive thread will stop immediately } + if (caglobals.tcp.connectionFds[1] != -1) + { + close(caglobals.tcp.connectionFds[1]); + } + if (caglobals.tcp.started) { ca_cond_wait(g_condObjectList, g_mutexObjectList); -- 2.7.4