[Win32] Implement events shutdown in lieu of pipes
authorDavid Antler <david.a.antler@intel.com>
Tue, 2 Feb 2016 00:50:39 +0000 (16:50 -0800)
committerDavid Antler <david.a.antler@intel.com>
Fri, 27 May 2016 17:51:47 +0000 (17:51 +0000)
Windows is not able to select() for read on unnamed pipes.  Move to an
event-based scheme where signaling can come from a shutdownEvent or
readable data on a socket.

Included some clean-up of preprocessor macros.

Change-Id: I7db3235a6c870c97c9c317f833f2d8682fd9ce70
Signed-off-by: David Antler <david.a.antler@intel.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/5521
Reviewed-by: Dave Thaler <dthaler@microsoft.com>
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
resource/csdk/connectivity/api/cacommon.h
resource/csdk/connectivity/src/ip_adapter/caipserver.c

index fcf8328..3fc01de 100644 (file)
@@ -42,6 +42,7 @@
 
 #if defined(_WIN32)
 #include <mswsock.h>
+#include <winsock2.h>
 #endif
 
 #ifdef __cplusplus
@@ -497,7 +498,11 @@ typedef struct
         CASocket_t m4;              /**< multicast IPv4 */
         CASocket_t m4s;             /**< multicast IPv4 secure */
         int netlinkFd;              /**< netlink */
-        int shutdownFds[2];         /**< shutdown pipe */
+#if defined(_WIN32)
+        WSAEVENT shutdownEvent;     /**< Event used to signal threads to stop */
+#else
+        int shutdownFds[2];         /**< fds used to signal threads to stop */
+#endif
         int selectTimeout;          /**< in seconds */
         int maxfd;                  /**< highest fd (for select) */
         bool started;               /**< the IP adapter has started */
index cacf7b0..c4b713c 100644 (file)
 #endif
 
 #include <sys/types.h>
-#if !defined(__msys_nt__)
+#if !defined(_WIN32)
 #include <sys/socket.h>
 #endif
 
-#if defined(__msys_nt__)
+#if defined(_WIN32)
+#include <assert.h>
 #include <winsock2.h>
 #include <ws2def.h>
 #include <mswsock.h>
@@ -41,7 +42,7 @@
 #include <unistd.h>
 #include <sys/types.h>
 #include <fcntl.h>
-#if !defined(__msys_nt__)
+#if !defined(_WIN32)
 #include <sys/select.h>
 #include <arpa/inet.h>
 #include <netinet/in.h>
@@ -135,22 +136,25 @@ static CAIPPacketReceivedCallback g_packetReceivedCallback;
 
 static void CAHandleNetlink();
 static void CAFindReadyMessage();
+#if !defined(WSA_WAIT_EVENT_0)
 static void CASelectReturned(fd_set *readFds, int ret);
+#else
+static void CAEventReturned(HANDLE);
+#endif
 static void CAProcessNewInterface(CAInterface_t *ifchanged);
 static CAResult_t CAReceiveMessage(int fd, CATransportFlags_t flags);
 
-#define SET(TYPE, FDS) \
-    if (caglobals.ip.TYPE.fd != -1) \
-    { \
-        FD_SET(caglobals.ip.TYPE.fd, FDS); \
-    }
+static void CAReceiveHandler(void *data)
+{
+    (void)data;
 
-#define ISSET(TYPE, FDS, FLAGS) \
-    if (caglobals.ip.TYPE.fd != -1 && FD_ISSET(caglobals.ip.TYPE.fd, FDS)) \
-    { \
-        fd = caglobals.ip.TYPE.fd; \
-        flags = FLAGS; \
+    while (!caglobals.ip.terminate)
+    {
+        CAFindReadyMessage();
     }
+}
+
+#if !defined(WSA_WAIT_EVENT_0)
 
 #define CLOSE_SOCKET(TYPE) \
     if (caglobals.ip.TYPE.fd != -1) \
@@ -159,33 +163,19 @@ static CAResult_t CAReceiveMessage(int fd, CATransportFlags_t flags);
         caglobals.ip.TYPE.fd = -1; \
     }
 
-void CADeInitializeIPGlobals()
-{
-    CLOSE_SOCKET(u6);
-    CLOSE_SOCKET(u6s);
-    CLOSE_SOCKET(u4);
-    CLOSE_SOCKET(u4s);
-    CLOSE_SOCKET(m6);
-    CLOSE_SOCKET(m6s);
-    CLOSE_SOCKET(m4);
-    CLOSE_SOCKET(m4s);
-
-    if (caglobals.ip.netlinkFd != -1)
-    {
-        close(caglobals.ip.netlinkFd);
-        caglobals.ip.netlinkFd = -1;
+#define SET(TYPE, FDS) \
+    if (caglobals.ip.TYPE.fd != -1) \
+    { \
+        FD_SET(caglobals.ip.TYPE.fd, FDS); \
     }
-}
 
-static void CAReceiveHandler(void *data)
-{
-    (void)data;
-
-    while (!caglobals.ip.terminate)
-    {
-        CAFindReadyMessage();
+#define ISSET(TYPE, FDS, FLAGS) \
+    if (caglobals.ip.TYPE.fd != -1 && FD_ISSET(caglobals.ip.TYPE.fd, FDS)) \
+    { \
+        fd = caglobals.ip.TYPE.fd; \
+        flags = FLAGS; \
     }
-}
+
 
 static void CAFindReadyMessage()
 {
@@ -206,7 +196,6 @@ static void CAFindReadyMessage()
     SET(m4,  &readFds)
     SET(m4s, &readFds)
 
-#if !defined(_WIN32)
     if (caglobals.ip.shutdownFds[0] != -1)
     {
         FD_SET(caglobals.ip.shutdownFds[0], &readFds);
@@ -215,7 +204,7 @@ static void CAFindReadyMessage()
     {
         FD_SET(caglobals.ip.netlinkFd, &readFds);
     }
-#endif
+
     int ret = select(caglobals.ip.maxfd + 1, &readFds, NULL, NULL, tv);
 
     if (caglobals.ip.terminate)
@@ -252,8 +241,7 @@ static void CASelectReturned(fd_set *readFds, int ret)
         else ISSET(m6s, readFds, CA_MULTICAST | CA_IPV6 | CA_SECURE)
         else ISSET(m4,  readFds, CA_MULTICAST | CA_IPV4)
         else ISSET(m4s, readFds, CA_MULTICAST | CA_IPV4 | CA_SECURE)
-#if !defined(_WIN32)
-        else if (FD_ISSET(caglobals.ip.netlinkFd, readFds))
+        else if ((caglobals.ip.netlinkFd != -1) && FD_ISSET(caglobals.ip.netlinkFd, readFds))
         {
             CAInterface_t *ifchanged = CAFindInterfaceChange();
             if (ifchanged)
@@ -277,14 +265,219 @@ static void CASelectReturned(fd_set *readFds, int ret)
         {
             break;
         }
-#else
+        (void)CAReceiveMessage(fd, flags);
+        FD_CLR(fd, readFds);
+    }
+}
+
+#else // if defined(WSA_WAIT_EVENT_0)
+
+#define CLOSE_SOCKET(TYPE) \
+    if (caglobals.ip.TYPE.fd != -1) \
+    { \
+        closesocket(caglobals.ip.TYPE.fd); \
+        caglobals.ip.TYPE.fd = -1; \
+    }
+
+#define PUSH_HANDLE(HANDLE, ARRAY, INDEX) \
+{ \
+    ARRAY[INDEX] = HANDLE; \
+    INDEX++; \
+}
+
+// Turn handle into WSAEvent and push to ARRAY
+#define PUSH_SOCKET(SOCKET, ARRAY, INDEX) \
+    if (SOCKET != -1) \
+    { \
+        WSAEVENT NewEvent; \
+        NewEvent = WSACreateEvent(); \
+        if (WSA_INVALID_EVENT != NewEvent) \
+        { \
+            if (0 != WSAEventSelect(SOCKET, NewEvent, FD_READ)) \
+            { \
+                OIC_LOG_V(ERROR, TAG, "WSAEventSelect failed 0x%08x ", WSAGetLastError()); \
+                if (!WSACloseEvent(NewEvent)) \
+                { \
+                    OIC_LOG_V(ERROR, TAG, "WSACloseEvent(NewEvent) failed 0x%08x", WSAGetLastError()); \
+                } \
+            } \
+            else \
+            { \
+                PUSH_HANDLE(NewEvent, ARRAY, INDEX); \
+            } \
+        } \
+        else \
+        { \
+            OIC_LOG_V(ERROR, TAG, "WSACreateEvent(NewEvent) failed 0x%08x", WSAGetLastError()); \
+        }\
+    }
+
+#define INSERT_FD(FD, ARRAY, INDEX) \
+    { \
+        if (-1 != FD) \
+        { \
+            ARRAY[INDEX] = FD; \
+        } \
+    }
+
+
+// Inserts the FD into the FD_ARRAY and pushes the socket event into ARRAY
+#define PUSH_IP_SOCKET(TYPE, ARRAY, FD_ARRAY, INDEX) \
+    { \
+        if (-1 != caglobals.ip.TYPE.fd) \
+        { \
+            INSERT_FD(caglobals.ip.TYPE.fd, FD_ARRAY, INDEX); \
+            PUSH_SOCKET(caglobals.ip.TYPE.fd, ARRAY, INDEX); \
+        } \
+    }
+
+#define IS_MATCHING_IP_HANDLE(TYPE, HANDLE, FLAGS) \
+    if ((caglobals.ip.TYPE.fd != -1) && (caglobals.ip.TYPE.fd == HANDLE)) \
+    { \
+        fd = caglobals.ip.TYPE.fd; \
+        flags = FLAGS; \
+    }
+
+#define EVENT_ARRAY_SIZE  10
+
+static void CAFindReadyMessage()
+{
+    int fdArray[EVENT_ARRAY_SIZE];
+    HANDLE eventArray[EVENT_ARRAY_SIZE];
+    int arraySize = 0;
+    int eventIndex;
+
+    // fdArray and eventArray should have same number of elements
+    /** @todo: replace with OC_STATIC_ASSERT */
+    _Static_assert(_countof(fdArray) == _countof(eventArray), "Arrays should have same number of elements");
+
+    PUSH_IP_SOCKET(u6,  eventArray, fdArray, arraySize);
+    PUSH_IP_SOCKET(u6s, eventArray, fdArray, arraySize);
+    PUSH_IP_SOCKET(u4,  eventArray, fdArray, arraySize);
+    PUSH_IP_SOCKET(u4s, eventArray, fdArray, arraySize);
+    PUSH_IP_SOCKET(m6,  eventArray, fdArray, arraySize);
+    PUSH_IP_SOCKET(m6s, eventArray, fdArray, arraySize);
+    PUSH_IP_SOCKET(m4,  eventArray, fdArray, arraySize);
+    PUSH_IP_SOCKET(m4s, eventArray, fdArray, arraySize);
+
+    if (-1 != caglobals.ip.shutdownEvent)
+    {
+        INSERT_FD(caglobals.ip.shutdownEvent, fdArray, arraySize);
+        PUSH_HANDLE(caglobals.ip.shutdownEvent, eventArray, arraySize);
+    }
+
+    /** @todo Support netlink events */
+
+    // Should not have overflowed buffer
+    assert(arraySize <= (_countof(fdArray)));
+
+    // Timeout is unnecessary on Windows
+    assert(-1 == caglobals.ip.selectTimeout);
+
+    while (!caglobals.ip.terminate)
+    {
+        int ret = WSAWaitForMultipleEvents(arraySize, eventArray, FALSE, WSA_INFINITE, FALSE);
+
+        switch (ret)
+        {
+            case WSA_WAIT_FAILED:
+                OIC_LOG_V(ERROR, TAG, "WSAWaitForMultipleEvents returned WSA_WAIT_FAILED 0x%08x", WSAGetLastError());
+                break;
+            case WSA_WAIT_IO_COMPLETION:
+                OIC_LOG_V(ERROR, TAG, "WSAWaitForMultipleEvents returned WSA_WAIT_IO_COMPLETION 0x%08x", WSAGetLastError());
+                break;
+            case WSA_WAIT_TIMEOUT:
+                OIC_LOG_V(ERROR, TAG, "WSAWaitForMultipleEvents returned WSA_WAIT_TIMEOUT 0x%08x", WSAGetLastError());
+                break;
+            default:
+                eventIndex = ret - WSA_WAIT_EVENT_0;
+                if ((eventIndex >= 0) && (eventIndex < arraySize))
+                {
+                    if (false == WSAResetEvent(eventArray[eventIndex]))
+                    {
+                        OIC_LOG_V(ERROR, TAG, "WSAResetEvent failed 0x%08x", WSAGetLastError());
+                    }
+                    CAEventReturned(fdArray[eventIndex]);
+                }
+                else
+                {
+                    OIC_LOG_V(ERROR, TAG, "WSAWaitForMultipleEvents failed 0x%08x", WSAGetLastError());
+                }
+                break;
+        }
+
+    }
+
+    while (arraySize > 0)
+    {
+        arraySize--;
+        if (!WSACloseEvent(eventArray[arraySize]))
+        {
+            OIC_LOG_V(ERROR, TAG, "WSACloseEvent (Index %i) failed 0x%08x", arraySize, WSAGetLastError());
+        }
+    }
+}
+
+static void CAEventReturned(HANDLE handle)
+{
+    int fd = -1;
+    CATransportFlags_t flags = CA_DEFAULT_FLAGS;
+
+    while (!caglobals.ip.terminate)
+    {
+        IS_MATCHING_IP_HANDLE(u6,  handle, CA_IPV6)
+        else IS_MATCHING_IP_HANDLE(u6s, handle, CA_IPV6 | CA_SECURE)
+        else IS_MATCHING_IP_HANDLE(u4,  handle, CA_IPV4)
+        else IS_MATCHING_IP_HANDLE(u4s, handle, CA_IPV4 | CA_SECURE)
+        else IS_MATCHING_IP_HANDLE(m6,  handle, CA_MULTICAST | CA_IPV6)
+        else IS_MATCHING_IP_HANDLE(m6s, handle, CA_MULTICAST | CA_IPV6 | CA_SECURE)
+        else IS_MATCHING_IP_HANDLE(m4,  handle, CA_MULTICAST | CA_IPV4)
+        else IS_MATCHING_IP_HANDLE(m4s, handle, CA_MULTICAST | CA_IPV4 | CA_SECURE)
+        else if ((caglobals.ip.shutdownEvent != -1) && (caglobals.ip.shutdownEvent == handle))
+        {
+            break;
+        }
         else
         {
             break;
         }
+        (void)CAReceiveMessage(handle, flags);
+        // We will never get more than one match per handle, so always break.
+        break;
+    }
+
+    if (caglobals.ip.terminate)
+    {
+        if (-1 != caglobals.ip.shutdownEvent)
+        {
+            // We presume the shutdownEvent will be closed in CAFindReadyMessage
+            caglobals.ip.shutdownEvent = -1;
+            WSACleanup();
+        }
+    }
+}
+
 #endif
-        (void)CAReceiveMessage(fd, flags);
-        FD_CLR(fd, readFds);
+
+void CADeInitializeIPGlobals()
+{
+    CLOSE_SOCKET(u6);
+    CLOSE_SOCKET(u6s);
+    CLOSE_SOCKET(u4);
+    CLOSE_SOCKET(u4s);
+    CLOSE_SOCKET(m6);
+    CLOSE_SOCKET(m6s);
+    CLOSE_SOCKET(m4);
+    CLOSE_SOCKET(m4s);
+
+    if (caglobals.ip.netlinkFd != -1)
+    {
+#ifdef _WIN32
+        closesocket(caglobals.ip.netlinkFd);
+#else
+        close(caglobals.ip.netlinkFd);
+#endif
+        caglobals.ip.netlinkFd = -1;
     }
 }
 
@@ -521,7 +714,11 @@ static int CACreateSocket(int family, uint16_t *port)
         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof (on)))
         {
             OIC_LOG_V(ERROR, TAG, "SO_REUSEADDR failed: %s", CAIPS_GET_ERROR);
+#ifdef _WIN32
+            closesocket(fd);
+#else
             close(fd);
+#endif
             return -1;
         }
     }
@@ -529,7 +726,11 @@ static int CACreateSocket(int family, uint16_t *port)
     if (-1 == bind(fd, (struct sockaddr *)&sa, socklen))
     {
         OIC_LOG_V(ERROR, TAG, "bind socket failed: %s", CAIPS_GET_ERROR);
+#ifdef _WIN32
+        closesocket(fd);
+#else
         close(fd);
+#endif
         return -1;
     }
 
@@ -538,7 +739,11 @@ static int CACreateSocket(int family, uint16_t *port)
         if (-1 == getsockname(fd, (struct sockaddr *)&sa, &socklen))
         {
             OIC_LOG_V(ERROR, TAG, "getsockname failed: %s", CAIPS_GET_ERROR);
+#ifdef _WIN32
+            closesocket(fd);
+#else
             close(fd);
+#endif
             return -1;
         }
         *port = ntohs(family == AF_INET6 ?
@@ -558,6 +763,7 @@ static int CACreateSocket(int family, uint16_t *port)
 
 static void CAInitializeNetlink()
 {
+    caglobals.ip.netlinkFd = -1;
 #ifdef __linux__
     // create NETLINK fd for interface change notifications
     struct sockaddr_nl sa = { AF_NETLINK, 0, 0, RTMGRP_LINK };
@@ -584,14 +790,28 @@ static void CAInitializeNetlink()
 #endif
 }
 
-static void CAInitializePipe()
+static void CAInitializeFastShutdownMechanism()
 {
-#if !defined(_WIN32)
-    caglobals.ip.selectTimeout = -1;
-#ifdef HAVE_PIPE2
-    int ret = pipe2(caglobals.ip.shutdownFds, O_CLOEXEC);
+    caglobals.ip.selectTimeout = -1; // don't poll for shutdown
+    int ret = -1;
+#if defined(WSA_WAIT_EVENT_0)
+    caglobals.ip.shutdownEvent = -1;
+    caglobals.ip.shutdownEvent = WSACreateEvent();
+
+    if (caglobals.ip.shutdownEvent == WSA_INVALID_EVENT)
+    {
+        caglobals.ip.shutdownEvent = -1;
+    }
+    else
+    {
+        ret = 0;
+    }
+#elif defined(HAVE_PIPE2)
+    ret = pipe2(caglobals.ip.shutdownFds, O_CLOEXEC);
+    CHECKFD(caglobals.ip.shutdownFds[0]);
+    CHECKFD(caglobals.ip.shutdownFds[1]);
 #else
-    int ret = pipe(caglobals.ip.shutdownFds);
+    ret = pipe(caglobals.ip.shutdownFds);
     if (-1 != ret)
     {
         ret = fcntl(caglobals.ip.shutdownFds[0], F_GETFD);
@@ -615,15 +835,14 @@ static void CAInitializePipe()
             caglobals.ip.shutdownFds[1] = -1;
         }
     }
+    CHECKFD(caglobals.ip.shutdownFds[0]);
+    CHECKFD(caglobals.ip.shutdownFds[1]);
 #endif
     if (-1 == ret)
     {
-        OIC_LOG_V(ERROR, TAG, "pipe failed: %s", strerror(errno));
+        OIC_LOG_V(ERROR, TAG, "fast shutdown mechanism init failed: %s", CAIPS_GET_ERROR);
         caglobals.ip.selectTimeout = SELECT_TIMEOUT; //poll needed for shutdown
     }
-#else
-    /** @todo Refactor to support Windows-specific inter-thread communication code. */
-#endif
 }
 
 CAResult_t CAIPStartServer(const ca_thread_pool_t threadPool)
@@ -690,7 +909,7 @@ CAResult_t CAIPStartServer(const ca_thread_pool_t threadPool)
               caglobals.ip.u6.port, caglobals.ip.u6s.port, caglobals.ip.u4.port,
               caglobals.ip.u4s.port, caglobals.ip.m6.port, caglobals.ip.m6s.port,
               caglobals.ip.m4.port, caglobals.ip.m4s.port);
-#if defined (_WIN32)
+#if defined (SIO_GET_EXTENSION_FUNCTION_POINTER)
     caglobals.ip.wsaRecvMsg = NULL;
     GUID GuidWSARecvMsg = WSAID_WSARECVMSG;
     DWORD copied = 0;
@@ -701,10 +920,8 @@ CAResult_t CAIPStartServer(const ca_thread_pool_t threadPool)
         return CA_STATUS_FAILED;
     }
 #endif
-    // create pipe for fast shutdown
-    CAInitializePipe();
-    CHECKFD(caglobals.ip.shutdownFds[0]);
-    CHECKFD(caglobals.ip.shutdownFds[1]);
+    // set up appropriate FD mechanism for fast shutdown
+    CAInitializeFastShutdownMechanism();
 
     // create source of network interface change notifications
     CAInitializeNetlink();
@@ -735,8 +952,8 @@ void CAIPStopServer()
 {
     caglobals.ip.started = false;
     caglobals.ip.terminate = true;
-#if !defined(_WIN32)
 
+#if !defined(WSA_WAIT_EVENT_0)
     if (caglobals.ip.shutdownFds[1] != -1)
     {
         close(caglobals.ip.shutdownFds[1]);
@@ -747,13 +964,16 @@ void CAIPStopServer()
         // receive thread will stop in SELECT_TIMEOUT seconds.
     }
 #else
-    /** @todo Refactor to support Windows-specific inter-thread communication code. */
+    if (!WSASetEvent(caglobals.ip.shutdownEvent))
+    {
+        OIC_LOG_V(DEBUG, TAG, "set shutdown event failed: %#08X", GetLastError());
+    }
 #endif
 }
 
 void CAWakeUpForChange()
 {
-#if !defined(_WIN32)
+#if !defined(WSA_WAIT_EVENT_0)
     if (caglobals.ip.shutdownFds[1] != -1)
     {
         ssize_t len = 0;
@@ -767,7 +987,10 @@ void CAWakeUpForChange()
         }
     }
 #else
-    /** @todo Refactor to support Windows-specific inter-thread communication code. */
+    if (!WSASetEvent(caglobals.ip.shutdownEvent))
+    {
+        OIC_LOG_V(DEBUG, TAG, "set shutdown event failed: %#08X", GetLastError());
+    }
 #endif
 }
 
@@ -920,18 +1143,14 @@ CAResult_t CAIPStopListenServer()
         }
         if (ifitem->family == AF_INET)
         {
-            close(caglobals.ip.m4.fd);
-            close(caglobals.ip.m4s.fd);
-            caglobals.ip.m4.fd = -1;
-            caglobals.ip.m4s.fd = -1;
+            CLOSE_SOCKET(m4);
+            CLOSE_SOCKET(m4s);
             OIC_LOG_V(DEBUG, TAG, "IPv4 network interface: %s cloed", ifitem->name);
         }
         if (ifitem->family == AF_INET6)
         {
-            close(caglobals.ip.m6.fd);
-            close(caglobals.ip.m6s.fd);
-            caglobals.ip.m6.fd = -1;
-            caglobals.ip.m6s.fd = -1;
+            CLOSE_SOCKET(m6);
+            CLOSE_SOCKET(m6s);
             OIC_LOG_V(DEBUG, TAG, "IPv6 network interface: %s", ifitem->name);
         }
     }