From: David Antler Date: Tue, 2 Feb 2016 00:50:39 +0000 (-0800) Subject: [Win32] Implement events shutdown in lieu of pipes X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=33a0e802c8c570c7b67902cdc8cee75a1f939e10;p=contrib%2Fiotivity.git [Win32] Implement events shutdown in lieu of pipes Windows is not able to select() for read on unnamed pipes. Move to an event-based scheme where signaling can come from a shutdownEvent or readable data on a socket. Included some clean-up of preprocessor macros. Change-Id: I7db3235a6c870c97c9c317f833f2d8682fd9ce70 Signed-off-by: David Antler Reviewed-on: https://gerrit.iotivity.org/gerrit/5521 Reviewed-by: Dave Thaler Tested-by: jenkins-iotivity --- diff --git a/resource/csdk/connectivity/api/cacommon.h b/resource/csdk/connectivity/api/cacommon.h index fcf8328..3fc01de 100644 --- a/resource/csdk/connectivity/api/cacommon.h +++ b/resource/csdk/connectivity/api/cacommon.h @@ -42,6 +42,7 @@ #if defined(_WIN32) #include +#include #endif #ifdef __cplusplus @@ -497,7 +498,11 @@ typedef struct CASocket_t m4; /**< multicast IPv4 */ CASocket_t m4s; /**< multicast IPv4 secure */ int netlinkFd; /**< netlink */ - int shutdownFds[2]; /**< shutdown pipe */ +#if defined(_WIN32) + WSAEVENT shutdownEvent; /**< Event used to signal threads to stop */ +#else + int shutdownFds[2]; /**< fds used to signal threads to stop */ +#endif int selectTimeout; /**< in seconds */ int maxfd; /**< highest fd (for select) */ bool started; /**< the IP adapter has started */ diff --git a/resource/csdk/connectivity/src/ip_adapter/caipserver.c b/resource/csdk/connectivity/src/ip_adapter/caipserver.c index cacf7b0..c4b713c 100644 --- a/resource/csdk/connectivity/src/ip_adapter/caipserver.c +++ b/resource/csdk/connectivity/src/ip_adapter/caipserver.c @@ -26,11 +26,12 @@ #endif #include -#if !defined(__msys_nt__) +#if !defined(_WIN32) #include #endif -#if defined(__msys_nt__) +#if defined(_WIN32) +#include #include #include #include @@ -41,7 +42,7 @@ #include #include #include -#if !defined(__msys_nt__) +#if !defined(_WIN32) #include #include #include @@ -135,22 +136,25 @@ static CAIPPacketReceivedCallback g_packetReceivedCallback; static void CAHandleNetlink(); static void CAFindReadyMessage(); +#if !defined(WSA_WAIT_EVENT_0) static void CASelectReturned(fd_set *readFds, int ret); +#else +static void CAEventReturned(HANDLE); +#endif static void CAProcessNewInterface(CAInterface_t *ifchanged); static CAResult_t CAReceiveMessage(int fd, CATransportFlags_t flags); -#define SET(TYPE, FDS) \ - if (caglobals.ip.TYPE.fd != -1) \ - { \ - FD_SET(caglobals.ip.TYPE.fd, FDS); \ - } +static void CAReceiveHandler(void *data) +{ + (void)data; -#define ISSET(TYPE, FDS, FLAGS) \ - if (caglobals.ip.TYPE.fd != -1 && FD_ISSET(caglobals.ip.TYPE.fd, FDS)) \ - { \ - fd = caglobals.ip.TYPE.fd; \ - flags = FLAGS; \ + while (!caglobals.ip.terminate) + { + CAFindReadyMessage(); } +} + +#if !defined(WSA_WAIT_EVENT_0) #define CLOSE_SOCKET(TYPE) \ if (caglobals.ip.TYPE.fd != -1) \ @@ -159,33 +163,19 @@ static CAResult_t CAReceiveMessage(int fd, CATransportFlags_t flags); caglobals.ip.TYPE.fd = -1; \ } -void CADeInitializeIPGlobals() -{ - CLOSE_SOCKET(u6); - CLOSE_SOCKET(u6s); - CLOSE_SOCKET(u4); - CLOSE_SOCKET(u4s); - CLOSE_SOCKET(m6); - CLOSE_SOCKET(m6s); - CLOSE_SOCKET(m4); - CLOSE_SOCKET(m4s); - - if (caglobals.ip.netlinkFd != -1) - { - close(caglobals.ip.netlinkFd); - caglobals.ip.netlinkFd = -1; +#define SET(TYPE, FDS) \ + if (caglobals.ip.TYPE.fd != -1) \ + { \ + FD_SET(caglobals.ip.TYPE.fd, FDS); \ } -} -static void CAReceiveHandler(void *data) -{ - (void)data; - - while (!caglobals.ip.terminate) - { - CAFindReadyMessage(); +#define ISSET(TYPE, FDS, FLAGS) \ + if (caglobals.ip.TYPE.fd != -1 && FD_ISSET(caglobals.ip.TYPE.fd, FDS)) \ + { \ + fd = caglobals.ip.TYPE.fd; \ + flags = FLAGS; \ } -} + static void CAFindReadyMessage() { @@ -206,7 +196,6 @@ static void CAFindReadyMessage() SET(m4, &readFds) SET(m4s, &readFds) -#if !defined(_WIN32) if (caglobals.ip.shutdownFds[0] != -1) { FD_SET(caglobals.ip.shutdownFds[0], &readFds); @@ -215,7 +204,7 @@ static void CAFindReadyMessage() { FD_SET(caglobals.ip.netlinkFd, &readFds); } -#endif + int ret = select(caglobals.ip.maxfd + 1, &readFds, NULL, NULL, tv); if (caglobals.ip.terminate) @@ -252,8 +241,7 @@ static void CASelectReturned(fd_set *readFds, int ret) else ISSET(m6s, readFds, CA_MULTICAST | CA_IPV6 | CA_SECURE) else ISSET(m4, readFds, CA_MULTICAST | CA_IPV4) else ISSET(m4s, readFds, CA_MULTICAST | CA_IPV4 | CA_SECURE) -#if !defined(_WIN32) - else if (FD_ISSET(caglobals.ip.netlinkFd, readFds)) + else if ((caglobals.ip.netlinkFd != -1) && FD_ISSET(caglobals.ip.netlinkFd, readFds)) { CAInterface_t *ifchanged = CAFindInterfaceChange(); if (ifchanged) @@ -277,14 +265,219 @@ static void CASelectReturned(fd_set *readFds, int ret) { break; } -#else + (void)CAReceiveMessage(fd, flags); + FD_CLR(fd, readFds); + } +} + +#else // if defined(WSA_WAIT_EVENT_0) + +#define CLOSE_SOCKET(TYPE) \ + if (caglobals.ip.TYPE.fd != -1) \ + { \ + closesocket(caglobals.ip.TYPE.fd); \ + caglobals.ip.TYPE.fd = -1; \ + } + +#define PUSH_HANDLE(HANDLE, ARRAY, INDEX) \ +{ \ + ARRAY[INDEX] = HANDLE; \ + INDEX++; \ +} + +// Turn handle into WSAEvent and push to ARRAY +#define PUSH_SOCKET(SOCKET, ARRAY, INDEX) \ + if (SOCKET != -1) \ + { \ + WSAEVENT NewEvent; \ + NewEvent = WSACreateEvent(); \ + if (WSA_INVALID_EVENT != NewEvent) \ + { \ + if (0 != WSAEventSelect(SOCKET, NewEvent, FD_READ)) \ + { \ + OIC_LOG_V(ERROR, TAG, "WSAEventSelect failed 0x%08x ", WSAGetLastError()); \ + if (!WSACloseEvent(NewEvent)) \ + { \ + OIC_LOG_V(ERROR, TAG, "WSACloseEvent(NewEvent) failed 0x%08x", WSAGetLastError()); \ + } \ + } \ + else \ + { \ + PUSH_HANDLE(NewEvent, ARRAY, INDEX); \ + } \ + } \ + else \ + { \ + OIC_LOG_V(ERROR, TAG, "WSACreateEvent(NewEvent) failed 0x%08x", WSAGetLastError()); \ + }\ + } + +#define INSERT_FD(FD, ARRAY, INDEX) \ + { \ + if (-1 != FD) \ + { \ + ARRAY[INDEX] = FD; \ + } \ + } + + +// Inserts the FD into the FD_ARRAY and pushes the socket event into ARRAY +#define PUSH_IP_SOCKET(TYPE, ARRAY, FD_ARRAY, INDEX) \ + { \ + if (-1 != caglobals.ip.TYPE.fd) \ + { \ + INSERT_FD(caglobals.ip.TYPE.fd, FD_ARRAY, INDEX); \ + PUSH_SOCKET(caglobals.ip.TYPE.fd, ARRAY, INDEX); \ + } \ + } + +#define IS_MATCHING_IP_HANDLE(TYPE, HANDLE, FLAGS) \ + if ((caglobals.ip.TYPE.fd != -1) && (caglobals.ip.TYPE.fd == HANDLE)) \ + { \ + fd = caglobals.ip.TYPE.fd; \ + flags = FLAGS; \ + } + +#define EVENT_ARRAY_SIZE 10 + +static void CAFindReadyMessage() +{ + int fdArray[EVENT_ARRAY_SIZE]; + HANDLE eventArray[EVENT_ARRAY_SIZE]; + int arraySize = 0; + int eventIndex; + + // fdArray and eventArray should have same number of elements + /** @todo: replace with OC_STATIC_ASSERT */ + _Static_assert(_countof(fdArray) == _countof(eventArray), "Arrays should have same number of elements"); + + PUSH_IP_SOCKET(u6, eventArray, fdArray, arraySize); + PUSH_IP_SOCKET(u6s, eventArray, fdArray, arraySize); + PUSH_IP_SOCKET(u4, eventArray, fdArray, arraySize); + PUSH_IP_SOCKET(u4s, eventArray, fdArray, arraySize); + PUSH_IP_SOCKET(m6, eventArray, fdArray, arraySize); + PUSH_IP_SOCKET(m6s, eventArray, fdArray, arraySize); + PUSH_IP_SOCKET(m4, eventArray, fdArray, arraySize); + PUSH_IP_SOCKET(m4s, eventArray, fdArray, arraySize); + + if (-1 != caglobals.ip.shutdownEvent) + { + INSERT_FD(caglobals.ip.shutdownEvent, fdArray, arraySize); + PUSH_HANDLE(caglobals.ip.shutdownEvent, eventArray, arraySize); + } + + /** @todo Support netlink events */ + + // Should not have overflowed buffer + assert(arraySize <= (_countof(fdArray))); + + // Timeout is unnecessary on Windows + assert(-1 == caglobals.ip.selectTimeout); + + while (!caglobals.ip.terminate) + { + int ret = WSAWaitForMultipleEvents(arraySize, eventArray, FALSE, WSA_INFINITE, FALSE); + + switch (ret) + { + case WSA_WAIT_FAILED: + OIC_LOG_V(ERROR, TAG, "WSAWaitForMultipleEvents returned WSA_WAIT_FAILED 0x%08x", WSAGetLastError()); + break; + case WSA_WAIT_IO_COMPLETION: + OIC_LOG_V(ERROR, TAG, "WSAWaitForMultipleEvents returned WSA_WAIT_IO_COMPLETION 0x%08x", WSAGetLastError()); + break; + case WSA_WAIT_TIMEOUT: + OIC_LOG_V(ERROR, TAG, "WSAWaitForMultipleEvents returned WSA_WAIT_TIMEOUT 0x%08x", WSAGetLastError()); + break; + default: + eventIndex = ret - WSA_WAIT_EVENT_0; + if ((eventIndex >= 0) && (eventIndex < arraySize)) + { + if (false == WSAResetEvent(eventArray[eventIndex])) + { + OIC_LOG_V(ERROR, TAG, "WSAResetEvent failed 0x%08x", WSAGetLastError()); + } + CAEventReturned(fdArray[eventIndex]); + } + else + { + OIC_LOG_V(ERROR, TAG, "WSAWaitForMultipleEvents failed 0x%08x", WSAGetLastError()); + } + break; + } + + } + + while (arraySize > 0) + { + arraySize--; + if (!WSACloseEvent(eventArray[arraySize])) + { + OIC_LOG_V(ERROR, TAG, "WSACloseEvent (Index %i) failed 0x%08x", arraySize, WSAGetLastError()); + } + } +} + +static void CAEventReturned(HANDLE handle) +{ + int fd = -1; + CATransportFlags_t flags = CA_DEFAULT_FLAGS; + + while (!caglobals.ip.terminate) + { + IS_MATCHING_IP_HANDLE(u6, handle, CA_IPV6) + else IS_MATCHING_IP_HANDLE(u6s, handle, CA_IPV6 | CA_SECURE) + else IS_MATCHING_IP_HANDLE(u4, handle, CA_IPV4) + else IS_MATCHING_IP_HANDLE(u4s, handle, CA_IPV4 | CA_SECURE) + else IS_MATCHING_IP_HANDLE(m6, handle, CA_MULTICAST | CA_IPV6) + else IS_MATCHING_IP_HANDLE(m6s, handle, CA_MULTICAST | CA_IPV6 | CA_SECURE) + else IS_MATCHING_IP_HANDLE(m4, handle, CA_MULTICAST | CA_IPV4) + else IS_MATCHING_IP_HANDLE(m4s, handle, CA_MULTICAST | CA_IPV4 | CA_SECURE) + else if ((caglobals.ip.shutdownEvent != -1) && (caglobals.ip.shutdownEvent == handle)) + { + break; + } else { break; } + (void)CAReceiveMessage(handle, flags); + // We will never get more than one match per handle, so always break. + break; + } + + if (caglobals.ip.terminate) + { + if (-1 != caglobals.ip.shutdownEvent) + { + // We presume the shutdownEvent will be closed in CAFindReadyMessage + caglobals.ip.shutdownEvent = -1; + WSACleanup(); + } + } +} + #endif - (void)CAReceiveMessage(fd, flags); - FD_CLR(fd, readFds); + +void CADeInitializeIPGlobals() +{ + CLOSE_SOCKET(u6); + CLOSE_SOCKET(u6s); + CLOSE_SOCKET(u4); + CLOSE_SOCKET(u4s); + CLOSE_SOCKET(m6); + CLOSE_SOCKET(m6s); + CLOSE_SOCKET(m4); + CLOSE_SOCKET(m4s); + + if (caglobals.ip.netlinkFd != -1) + { +#ifdef _WIN32 + closesocket(caglobals.ip.netlinkFd); +#else + close(caglobals.ip.netlinkFd); +#endif + caglobals.ip.netlinkFd = -1; } } @@ -521,7 +714,11 @@ static int CACreateSocket(int family, uint16_t *port) if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof (on))) { OIC_LOG_V(ERROR, TAG, "SO_REUSEADDR failed: %s", CAIPS_GET_ERROR); +#ifdef _WIN32 + closesocket(fd); +#else close(fd); +#endif return -1; } } @@ -529,7 +726,11 @@ static int CACreateSocket(int family, uint16_t *port) if (-1 == bind(fd, (struct sockaddr *)&sa, socklen)) { OIC_LOG_V(ERROR, TAG, "bind socket failed: %s", CAIPS_GET_ERROR); +#ifdef _WIN32 + closesocket(fd); +#else close(fd); +#endif return -1; } @@ -538,7 +739,11 @@ static int CACreateSocket(int family, uint16_t *port) if (-1 == getsockname(fd, (struct sockaddr *)&sa, &socklen)) { OIC_LOG_V(ERROR, TAG, "getsockname failed: %s", CAIPS_GET_ERROR); +#ifdef _WIN32 + closesocket(fd); +#else close(fd); +#endif return -1; } *port = ntohs(family == AF_INET6 ? @@ -558,6 +763,7 @@ static int CACreateSocket(int family, uint16_t *port) static void CAInitializeNetlink() { + caglobals.ip.netlinkFd = -1; #ifdef __linux__ // create NETLINK fd for interface change notifications struct sockaddr_nl sa = { AF_NETLINK, 0, 0, RTMGRP_LINK }; @@ -584,14 +790,28 @@ static void CAInitializeNetlink() #endif } -static void CAInitializePipe() +static void CAInitializeFastShutdownMechanism() { -#if !defined(_WIN32) - caglobals.ip.selectTimeout = -1; -#ifdef HAVE_PIPE2 - int ret = pipe2(caglobals.ip.shutdownFds, O_CLOEXEC); + caglobals.ip.selectTimeout = -1; // don't poll for shutdown + int ret = -1; +#if defined(WSA_WAIT_EVENT_0) + caglobals.ip.shutdownEvent = -1; + caglobals.ip.shutdownEvent = WSACreateEvent(); + + if (caglobals.ip.shutdownEvent == WSA_INVALID_EVENT) + { + caglobals.ip.shutdownEvent = -1; + } + else + { + ret = 0; + } +#elif defined(HAVE_PIPE2) + ret = pipe2(caglobals.ip.shutdownFds, O_CLOEXEC); + CHECKFD(caglobals.ip.shutdownFds[0]); + CHECKFD(caglobals.ip.shutdownFds[1]); #else - int ret = pipe(caglobals.ip.shutdownFds); + ret = pipe(caglobals.ip.shutdownFds); if (-1 != ret) { ret = fcntl(caglobals.ip.shutdownFds[0], F_GETFD); @@ -615,15 +835,14 @@ static void CAInitializePipe() caglobals.ip.shutdownFds[1] = -1; } } + CHECKFD(caglobals.ip.shutdownFds[0]); + CHECKFD(caglobals.ip.shutdownFds[1]); #endif if (-1 == ret) { - OIC_LOG_V(ERROR, TAG, "pipe failed: %s", strerror(errno)); + OIC_LOG_V(ERROR, TAG, "fast shutdown mechanism init failed: %s", CAIPS_GET_ERROR); caglobals.ip.selectTimeout = SELECT_TIMEOUT; //poll needed for shutdown } -#else - /** @todo Refactor to support Windows-specific inter-thread communication code. */ -#endif } CAResult_t CAIPStartServer(const ca_thread_pool_t threadPool) @@ -690,7 +909,7 @@ CAResult_t CAIPStartServer(const ca_thread_pool_t threadPool) caglobals.ip.u6.port, caglobals.ip.u6s.port, caglobals.ip.u4.port, caglobals.ip.u4s.port, caglobals.ip.m6.port, caglobals.ip.m6s.port, caglobals.ip.m4.port, caglobals.ip.m4s.port); -#if defined (_WIN32) +#if defined (SIO_GET_EXTENSION_FUNCTION_POINTER) caglobals.ip.wsaRecvMsg = NULL; GUID GuidWSARecvMsg = WSAID_WSARECVMSG; DWORD copied = 0; @@ -701,10 +920,8 @@ CAResult_t CAIPStartServer(const ca_thread_pool_t threadPool) return CA_STATUS_FAILED; } #endif - // create pipe for fast shutdown - CAInitializePipe(); - CHECKFD(caglobals.ip.shutdownFds[0]); - CHECKFD(caglobals.ip.shutdownFds[1]); + // set up appropriate FD mechanism for fast shutdown + CAInitializeFastShutdownMechanism(); // create source of network interface change notifications CAInitializeNetlink(); @@ -735,8 +952,8 @@ void CAIPStopServer() { caglobals.ip.started = false; caglobals.ip.terminate = true; -#if !defined(_WIN32) +#if !defined(WSA_WAIT_EVENT_0) if (caglobals.ip.shutdownFds[1] != -1) { close(caglobals.ip.shutdownFds[1]); @@ -747,13 +964,16 @@ void CAIPStopServer() // receive thread will stop in SELECT_TIMEOUT seconds. } #else - /** @todo Refactor to support Windows-specific inter-thread communication code. */ + if (!WSASetEvent(caglobals.ip.shutdownEvent)) + { + OIC_LOG_V(DEBUG, TAG, "set shutdown event failed: %#08X", GetLastError()); + } #endif } void CAWakeUpForChange() { -#if !defined(_WIN32) +#if !defined(WSA_WAIT_EVENT_0) if (caglobals.ip.shutdownFds[1] != -1) { ssize_t len = 0; @@ -767,7 +987,10 @@ void CAWakeUpForChange() } } #else - /** @todo Refactor to support Windows-specific inter-thread communication code. */ + if (!WSASetEvent(caglobals.ip.shutdownEvent)) + { + OIC_LOG_V(DEBUG, TAG, "set shutdown event failed: %#08X", GetLastError()); + } #endif } @@ -920,18 +1143,14 @@ CAResult_t CAIPStopListenServer() } if (ifitem->family == AF_INET) { - close(caglobals.ip.m4.fd); - close(caglobals.ip.m4s.fd); - caglobals.ip.m4.fd = -1; - caglobals.ip.m4s.fd = -1; + CLOSE_SOCKET(m4); + CLOSE_SOCKET(m4s); OIC_LOG_V(DEBUG, TAG, "IPv4 network interface: %s cloed", ifitem->name); } if (ifitem->family == AF_INET6) { - close(caglobals.ip.m6.fd); - close(caglobals.ip.m6s.fd); - caglobals.ip.m6.fd = -1; - caglobals.ip.m6s.fd = -1; + CLOSE_SOCKET(m6); + CLOSE_SOCKET(m6s); OIC_LOG_V(DEBUG, TAG, "IPv6 network interface: %s", ifitem->name); } }