From: Jon A. Cruz Date: Thu, 2 Apr 2015 14:48:30 +0000 (-0700) Subject: Added interruption of select() to linux IP reading to allow for proper X-Git-Tag: 0.9.1-alpha1~38^2~45 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=8b10437e11dd12664008a12e62f7732716fa9569;p=contrib%2Fiotivity.git Added interruption of select() to linux IP reading to allow for proper stopping. Added standard mechanism to cleanly interrupt select() calls in linux network code so that stack shutdown is now immediate. Among other things this allows for a longer timeout to be used in select() calls which reduces log spam and increases performance. Another effect is to clean up unit test performance with a main dev box gaining more than two orders of magnitude in performance. Stack unit tests were taking 39 seconds to complete whereas with this change they complete in less than 1/3rd of a second (1/10th for release builds). Change-Id: I330b1d7507ab81c5b54677d1ef21434862f63508 Signed-off-by: Jon A. Cruz Reviewed-on: https://gerrit.iotivity.org/gerrit/635 Tested-by: jenkins-iotivity Reviewed-by: Thiago Macieira Reviewed-by: Joseph Morrow Reviewed-by: Erich Keane --- diff --git a/resource/csdk/connectivity/src/ethernet_adapter/linux/caethernetserver.c b/resource/csdk/connectivity/src/ethernet_adapter/linux/caethernetserver.c index 0b248f2..699334e 100644 --- a/resource/csdk/connectivity/src/ethernet_adapter/linux/caethernetserver.c +++ b/resource/csdk/connectivity/src/ethernet_adapter/linux/caethernetserver.c @@ -62,6 +62,13 @@ #include "umutex.h" #include "oic_malloc.h" +// TODO g_stopSecureUnicast is set but never used. The three groups of +// globals should be combined into three instances of a common struct. + +#define PIPE_READ_FD 0 + +#define PIPE_WRITE_FD 1 + /** * @def ETHERNET_SERVER_TAG * @brief Logging tag for module name @@ -99,6 +106,11 @@ static u_mutex g_mutexUnicastServer = NULL; static bool g_stopUnicast = false; /** + * Handle to interrupt unicast server for stopping, etc. + */ +static int g_unicastTriggerFD = -1; + +/** * @var g_multicastServerSocketFD * @brief socket descriptor for multicast server */ @@ -116,6 +128,11 @@ static u_mutex g_mutexMulticastServer = NULL; */ static bool g_stopMulticast = false; +/** + * Handle to interrupt multicast server for stopping, etc. + */ +static int g_multicastTriggerFD = -1; + #ifdef __WITH_DTLS__ /** * @var g_secureUnicastServerSocketFD @@ -172,10 +189,33 @@ static CAEthernetExceptionCallback g_exceptionCallback = NULL; typedef struct { bool *stopFlag; + int stopFd; int32_t socket_fd; CAAdapterServerType_t type; } CAAdapterReceiveThreadContext_t; +/** + * Creates a non-blocking pipe. + * + * Creates a pipe with two file descriptors then sets both to be + * non-blocking. If an error occurs setting flags, any created handles + * will be closed and set to -1. + * + * @param pipefd array of two ints to store the pipe handles in. + * + * @return 0 on success, -1 otherwise. + */ +static int createNonblockingPipe(int pipefd[2]); + +/** + * Sets the given file descriptor to be non-blocking. + * + * @param fd the file descriptor to make non-blocking. + * + * @return 0 on success, -1 otherwise. + */ +static int setNonblocking(int fd); + static void CAReceiveHandler(void *data) { OIC_LOG(DEBUG, ETHERNET_SERVER_TAG, "IN"); @@ -190,13 +230,19 @@ static void CAReceiveHandler(void *data) while (true != *(ctx->stopFlag)) { - timeout.tv_sec = 1; + // safe to set longer value as select can be interrupted. + timeout.tv_sec = 10; timeout.tv_usec = 0; FD_ZERO(&reads); FD_SET(ctx->socket_fd, &reads); + int highest = (ctx->stopFd > ctx->socket_fd) ? ctx->stopFd : ctx->socket_fd; + if (ctx->stopFd != -1) + { + FD_SET(ctx->stopFd, &reads); + } - int32_t ret = select(ctx->socket_fd + 1, &reads, NULL, NULL, &timeout); + int32_t ret = select(highest + 1, &reads, NULL, NULL, &timeout); if (*(ctx->stopFlag) == true) { OIC_LOG_V(DEBUG, ETHERNET_SERVER_TAG, @@ -210,6 +256,13 @@ static void CAReceiveHandler(void *data) continue; } + if ((ctx->stopFd != -1) && FD_ISSET(ctx->stopFd, &reads)) + { + // Doesn't matter at this point what happens (errors can be + // ignored). Just drain some data if this code ever gets hit. + recv(ctx->stopFd, recvBuffer, sizeof(recvBuffer), MSG_DONTWAIT); + } + if (!FD_ISSET(ctx->socket_fd, &reads)) { continue; @@ -240,6 +293,10 @@ static void CAReceiveHandler(void *data) { g_exceptionCallback(ctx->type); } + if (ctx->stopFd != -1) + { + close(ctx->stopFd); + } OICFree(ctx); ctx = NULL; return; @@ -299,11 +356,20 @@ static void CAReceiveHandler(void *data) default: // Should never occur OIC_LOG_V(DEBUG, ETHERNET_SERVER_TAG, "Invalid server type"); + if (ctx->stopFd != -1) + { + close(ctx->stopFd); + } OICFree(ctx); ctx = NULL; return; } } + + if (ctx->stopFd != -1) + { + close(ctx->stopFd); + } OICFree(ctx); ctx = NULL; @@ -325,11 +391,8 @@ static CAResult_t CACreateSocket(int32_t *socketFD, const char *localIp, uint16_ } // Make the socket non-blocking - if (-1 == fcntl(sock, F_SETFL, O_NONBLOCK)) + if (-1 == setNonblocking(sock)) { - OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG, "Failed to set non-block mode, Error code: %s", - strerror(errno)); - close(sock); return CA_STATUS_FAILED; } @@ -462,12 +525,31 @@ static CAResult_t CAStartUnicastServer(const char *localAddress, uint16_t *port, return CA_MEMORY_ALLOC_FAILED; } + int pipefd[2] = {-1, -1}; + if (createNonblockingPipe(pipefd) != 0) + { + OIC_LOG(ERROR, ETHERNET_SERVER_TAG, "Failed to create pipe"); + OICFree(ctx); + close(*serverFD); + *serverFD = -1; + return CA_STATUS_FAILED; + } + + g_unicastTriggerFD = pipefd[PIPE_WRITE_FD]; // The write end of the pipe + ctx->stopFlag = &g_stopUnicast; + ctx->stopFd = pipefd[PIPE_READ_FD]; // The read end of the pipe ctx->socket_fd = *serverFD; ctx->type = isSecured ? CA_SECURED_UNICAST_SERVER : CA_UNICAST_SERVER; if (CA_STATUS_OK != u_thread_pool_add_task(g_threadPool, CAReceiveHandler, (void *)ctx)) { OIC_LOG(ERROR, ETHERNET_SERVER_TAG, "Failed to create read thread!"); + + close(g_unicastTriggerFD); + g_unicastTriggerFD = -1; + close(ctx->stopFd); + ctx->stopFd = -1; + OICFree(ctx); close(*serverFD); *serverFD = -1; @@ -722,7 +804,20 @@ CAResult_t CAEthernetStartMulticastServer(const char *localAddress, return CA_MEMORY_ALLOC_FAILED; } + int pipefd[2] = {-1, -1}; + if (createNonblockingPipe(pipefd) != 0) + { + OIC_LOG(ERROR, ETHERNET_SERVER_TAG, "Failed to create pipe"); + OICFree(ctx); + close(g_multicastServerSocketFD); + g_multicastServerSocketFD = -1; + return CA_STATUS_FAILED; + } + + g_multicastTriggerFD = pipefd[PIPE_WRITE_FD]; // The write end of the pipe + ctx->stopFlag = &g_stopMulticast; + ctx->stopFd = pipefd[PIPE_READ_FD]; // The read end of the pipe ctx->socket_fd = g_multicastServerSocketFD; ctx->type = CA_MULTICAST_SERVER; @@ -731,6 +826,11 @@ CAResult_t CAEthernetStartMulticastServer(const char *localAddress, { OIC_LOG(ERROR, ETHERNET_SERVER_TAG, "thread_pool_add_task failed!"); + close(g_multicastTriggerFD); + g_multicastTriggerFD = -1; + close(ctx->stopFd); + ctx->stopFd = -1; + close(g_multicastServerSocketFD); g_multicastServerSocketFD = -1; g_stopMulticast = true; @@ -752,7 +852,19 @@ CAResult_t CAEthernetStopUnicastServer() u_mutex_lock(g_mutexUnicastServer); g_stopUnicast = true; + if (g_unicastTriggerFD != -1) + { + if (write(g_unicastTriggerFD, "X", 1) == -1) + { + OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG, + "Failed to write to trigger, Error code: %s", + strerror(errno)); + } + close(g_unicastTriggerFD); + g_unicastTriggerFD = -1; + } CAResult_t ret = CACloseSocket(&g_unicastServerSocketFD); + g_unicastServerSocketFD = -1; u_mutex_unlock(g_mutexUnicastServer); OIC_LOG_V(INFO, ETHERNET_SERVER_TAG, "Unicast server stopped [%d]", ret); @@ -766,6 +878,17 @@ CAResult_t CAEthernetStopSecureUnicastServer() u_mutex_lock(g_mutexSecureUnicastServer); g_stopSecureUnicast = true; + if (g_unicastTriggerFD != -1) + { + if (write(g_unicastTriggerFD, "X", 1) == -1) + { + OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG, + "Failed to write to trigger, Error code: %s", + strerror(errno)); + } + close(g_unicastTriggerFD); + g_unicastTriggerFD = -1; + } CAResult_t ret = CACloseSocket(&g_secureUnicastServerSocketFD); u_mutex_unlock(g_mutexSecureUnicastServer); @@ -788,6 +911,17 @@ CAResult_t CAEthernetStopMulticastServer(void) } g_stopMulticast = true; + if (g_multicastTriggerFD != -1) + { + if (write(g_multicastTriggerFD, "X", 1) == -1) + { + OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG, + "Failed to write to trigger, Error code: %s", + strerror(errno)); + } + close(g_multicastTriggerFD); + g_multicastTriggerFD = -1; + } // leave the group after you are done if (-1 == setsockopt(g_multicastServerSocketFD, IPPROTO_IP, IP_DROP_MEMBERSHIP, @@ -851,3 +985,61 @@ void CAEthernetSetExceptionCallback(CAEthernetExceptionCallback callback) OIC_LOG(DEBUG, ETHERNET_SERVER_TAG, "OUT"); } +int setNonblocking(int fd) +{ + int rc = fcntl(fd, F_GETFL); + if (rc == -1) + { + OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG, + "Failed to get existing flags, Error code: %s", + strerror(errno)); + } + else + { + rc = fcntl(fd, F_SETFL, rc | O_NONBLOCK); + if (rc == -1) + { + OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG, + "Failed to set non-blocking mode, Error code: %s", + strerror(errno)); + } + } + + return rc; +} + +int createNonblockingPipe(int pipefd[2]) +{ + int rc = -1; + if (pipefd) + { + pipefd[PIPE_READ_FD] = -1; + pipefd[PIPE_WRITE_FD] = -1; + rc = pipe(pipefd); + + if (rc != -1) + { + rc = setNonblocking(pipefd[PIPE_READ_FD]); + } + + if (rc != -1) + { + rc = setNonblocking(pipefd[PIPE_WRITE_FD]); + } + + if (rc == -1) + { + if (pipefd[PIPE_READ_FD] != -1) + { + close(pipefd[PIPE_READ_FD]); + pipefd[PIPE_READ_FD] = -1; + } + if (pipefd[PIPE_WRITE_FD] != -1) + { + close(pipefd[PIPE_WRITE_FD]); + pipefd[PIPE_WRITE_FD]= -1; + } + } + } + return rc; +} diff --git a/resource/csdk/connectivity/src/wifi_adapter/linux/cawifiserver.c b/resource/csdk/connectivity/src/wifi_adapter/linux/cawifiserver.c index af86401..24189ae 100644 --- a/resource/csdk/connectivity/src/wifi_adapter/linux/cawifiserver.c +++ b/resource/csdk/connectivity/src/wifi_adapter/linux/cawifiserver.c @@ -61,6 +61,13 @@ #include "umutex.h" #include "oic_malloc.h" +// TODO g_stopSecureUnicast is set but never used. The three groups of +// globals should be combined into three instances of a common struct. + +#define PIPE_READ_FD 0 + +#define PIPE_WRITE_FD 1 + /** * @def WIFI_SERVER_TAG * @brief Logging tag for module name @@ -98,6 +105,11 @@ static u_mutex g_mutexUnicastServer = NULL; static bool g_stopUnicast = false; /** + * Handle to interrupt unicast server for stopping, etc. + */ +static int g_unicastTriggerFD = -1; + +/** * @var g_multicastServerSocketFD * @brief socket descriptor for multicast server */ @@ -115,6 +127,11 @@ static u_mutex g_mutexMulticastServer = NULL; */ static bool g_stopMulticast = false; +/** + * Handle to interrupt multicast server for stopping, etc. + */ +static int g_multicastTriggerFD = -1; + #ifdef __WITH_DTLS__ /** * @var g_secureUnicastServerSocketFD @@ -171,10 +188,33 @@ static CAWiFiExceptionCallback g_exceptionCallback = NULL; typedef struct { bool *stopFlag; + int stopFd; int32_t socket_fd; CAAdapterServerType_t type; } CAAdapterReceiveThreadContext_t; +/** + * Creates a non-blocking pipe. + * + * Creates a pipe with two file descriptors then sets both to be + * non-blocking. If an error occurs setting flags, any created handles + * will be closed and set to -1. + * + * @param pipefd array of two ints to store the pipe handles in. + * + * @return 0 on success, -1 otherwise. + */ +static int createNonblockingPipe(int pipefd[2]); + +/** + * Sets the given file descriptor to be non-blocking. + * + * @param fd the file descriptor to make non-blocking. + * + * @return 0 on success, -1 otherwise. + */ +static int setNonblocking(int fd); + static void CAReceiveHandler(void *data) { OIC_LOG(DEBUG, WIFI_SERVER_TAG, "IN"); @@ -189,13 +229,19 @@ static void CAReceiveHandler(void *data) while (true != *(ctx->stopFlag)) { - timeout.tv_sec = 1; + // safe to set longer value as select can be interrupted. + timeout.tv_sec = 10; timeout.tv_usec = 0; FD_ZERO(&reads); FD_SET(ctx->socket_fd, &reads); + int highest = (ctx->stopFd > ctx->socket_fd) ? ctx->stopFd : ctx->socket_fd; + if (ctx->stopFd != -1) + { + FD_SET(ctx->stopFd, &reads); + } - int32_t ret = select(ctx->socket_fd + 1, &reads, NULL, NULL, &timeout); + int32_t ret = select(highest + 1, &reads, NULL, NULL, &timeout); if (*(ctx->stopFlag) == true) { OIC_LOG_V(DEBUG, WIFI_SERVER_TAG, "Stop request received for [%d] server", ctx->type); @@ -206,6 +252,14 @@ static void CAReceiveHandler(void *data) OIC_LOG_V(FATAL, WIFI_SERVER_TAG, "select returned error %s", strerror(errno)); continue; } + + if ((ctx->stopFd != -1) && FD_ISSET(ctx->stopFd, &reads)) + { + // Doesn't matter at this point what happens (errors can be + // ignored). Just drain some data if this code ever gets hit. + recv(ctx->stopFd, recvBuffer, sizeof(recvBuffer), MSG_DONTWAIT); + } + if (!FD_ISSET(ctx->socket_fd, &reads)) { continue; @@ -233,6 +287,10 @@ static void CAReceiveHandler(void *data) { g_exceptionCallback(ctx->type); } + if (ctx->stopFd != -1) + { + close(ctx->stopFd); + } OICFree(ctx); return; } @@ -287,13 +345,22 @@ static void CAReceiveHandler(void *data) break; #endif //__WITH_DTLS__ default: - // Should never occur OIC_LOG_V(DEBUG, WIFI_SERVER_TAG, "Invalid server type"); + // Should never occur + OIC_LOG_V(DEBUG, WIFI_SERVER_TAG, "Invalid server type"); + if (ctx->stopFd != -1) + { + close(ctx->stopFd); + } OICFree(ctx); return; } } // free context + if (ctx->stopFd != -1) + { + close(ctx->stopFd); + } OICFree(ctx); OIC_LOG(DEBUG, WIFI_SERVER_TAG, "OUT"); @@ -312,11 +379,8 @@ static CAResult_t CAWiFiCreateSocket(int32_t *socketFD, const char *localIp, uin } // Make the socket non-blocking - if (-1 == fcntl(sock, F_SETFL, O_NONBLOCK)) + if (-1 == setNonblocking(sock)) { - OIC_LOG_V(ERROR, WIFI_SERVER_TAG, "Failed to set non-block mode, Error code: %s", - strerror(errno)); - close(sock); return CA_STATUS_FAILED; } @@ -456,12 +520,31 @@ static CAResult_t CAStartUnicastServer(const char *localAddress, uint16_t *port, return CA_MEMORY_ALLOC_FAILED; } + int pipefd[2] = {-1, -1}; + if (createNonblockingPipe(pipefd) != 0) + { + OIC_LOG(ERROR, WIFI_SERVER_TAG, "Failed to create pipe"); + OICFree(ctx); + close(*serverFD); + *serverFD = -1; + return CA_STATUS_FAILED; + } + + g_unicastTriggerFD = pipefd[PIPE_WRITE_FD]; // The write end of the pipe + ctx->stopFlag = &g_stopUnicast; + ctx->stopFd = pipefd[PIPE_READ_FD]; // The read end of the pipe ctx->socket_fd = *serverFD; ctx->type = isSecured ? CA_SECURED_UNICAST_SERVER : CA_UNICAST_SERVER; if (CA_STATUS_OK != u_thread_pool_add_task(g_threadPool, CAReceiveHandler, (void *) ctx)) { OIC_LOG(ERROR, WIFI_SERVER_TAG, "Failed to create read thread!"); + + close(g_unicastTriggerFD); + g_unicastTriggerFD = -1; + close(ctx->stopFd); + ctx->stopFd = -1; + OICFree((void *) ctx); close(*serverFD); return CA_STATUS_FAILED; @@ -709,7 +792,20 @@ CAResult_t CAWiFiStartMulticastServer(const char *localAddress, const char *mult return CA_MEMORY_ALLOC_FAILED; } + int pipefd[2] = {-1, -1}; + if (createNonblockingPipe(pipefd) != 0) + { + OIC_LOG(ERROR, WIFI_SERVER_TAG, "Failed to create pipe"); + OICFree(ctx); + close(g_multicastServerSocketFD); + g_multicastServerSocketFD = -1; + return CA_STATUS_FAILED; + } + + g_multicastTriggerFD = pipefd[PIPE_WRITE_FD]; // The write end of the pipe + ctx->stopFlag = &g_stopMulticast; + ctx->stopFd = pipefd[PIPE_READ_FD]; // The read end of the pipe ctx->socket_fd = g_multicastServerSocketFD; ctx->type = CA_MULTICAST_SERVER; @@ -718,6 +814,11 @@ CAResult_t CAWiFiStartMulticastServer(const char *localAddress, const char *mult { OIC_LOG(ERROR, WIFI_SERVER_TAG, "thread_pool_add_task failed!"); + close(g_multicastTriggerFD); + g_multicastTriggerFD = -1; + close(ctx->stopFd); + ctx->stopFd = -1; + close(g_multicastServerSocketFD); g_multicastServerSocketFD = -1; g_stopMulticast = true; @@ -739,6 +840,17 @@ CAResult_t CAWiFiStopUnicastServer() u_mutex_lock(g_mutexUnicastServer); g_stopUnicast = true; + if (g_unicastTriggerFD != -1) + { + if (write(g_unicastTriggerFD, "X", 1) == -1) + { + OIC_LOG_V(ERROR, WIFI_SERVER_TAG, + "Failed to write to trigger, Error code: %s", + strerror(errno)); + } + close(g_unicastTriggerFD); + g_unicastTriggerFD = -1; + } CAResult_t ret = CAWiFiCloseSocket(&g_unicastServerSocketFD); u_mutex_unlock(g_mutexUnicastServer); @@ -753,6 +865,17 @@ CAResult_t CAWiFiStopSecureUnicastServer() u_mutex_lock(g_mutexSecureUnicastServer); g_stopSecureUnicast = true; + if (g_unicastTriggerFD != -1) + { + if (write(g_unicastTriggerFD, "X", 1) == -1) + { + OIC_LOG_V(ERROR, WIFI_SERVER_TAG, + "Failed to write to trigger, Error code: %s", + strerror(errno)); + } + close(g_unicastTriggerFD); + g_unicastTriggerFD = -1; + } CAResult_t ret = CAWiFiCloseSocket(&g_secureUnicastServerSocketFD); u_mutex_unlock(g_mutexSecureUnicastServer); @@ -775,6 +898,17 @@ CAResult_t CAWiFiStopMulticastServer(void) } g_stopMulticast = true; + if (g_multicastTriggerFD != -1) + { + if (write(g_multicastTriggerFD, "X", 1) == -1) + { + OIC_LOG_V(ERROR, WIFI_SERVER_TAG, + "Failed to write to trigger, Error code: %s", + strerror(errno)); + } + close(g_multicastTriggerFD); + g_multicastTriggerFD = -1; + } // leave the group after you are done if (-1 @@ -838,3 +972,61 @@ void CAWiFiSetExceptionCallback(CAWiFiExceptionCallback callback) g_exceptionCallback = callback; } +int setNonblocking(int fd) +{ + int rc = fcntl(fd, F_GETFL); + if (rc == -1) + { + OIC_LOG_V(ERROR, WIFI_SERVER_TAG, + "Failed to get existing flags, Error code: %s", + strerror(errno)); + } + else + { + rc = fcntl(fd, F_SETFL, rc | O_NONBLOCK); + if (rc == -1) + { + OIC_LOG_V(ERROR, WIFI_SERVER_TAG, + "Failed to set non-blocking mode, Error code: %s", + strerror(errno)); + } + } + + return rc; +} + +int createNonblockingPipe(int pipefd[2]) +{ + int rc = -1; + if (pipefd) + { + pipefd[PIPE_READ_FD] = -1; + pipefd[PIPE_WRITE_FD] = -1; + rc = pipe(pipefd); + + if (rc != -1) + { + rc = setNonblocking(pipefd[PIPE_READ_FD]); + } + + if (rc != -1) + { + rc = setNonblocking(pipefd[PIPE_WRITE_FD]); + } + + if (rc == -1) + { + if (pipefd[PIPE_READ_FD] != -1) + { + close(pipefd[PIPE_READ_FD]); + pipefd[PIPE_READ_FD] = -1; + } + if (pipefd[PIPE_WRITE_FD] != -1) + { + close(pipefd[PIPE_WRITE_FD]); + pipefd[PIPE_WRITE_FD]= -1; + } + } + } + return rc; +}