Added interruption of select() to linux IP reading to allow for proper
authorJon A. Cruz <jonc@osg.samsung.com>
Thu, 2 Apr 2015 14:48:30 +0000 (07:48 -0700)
committerErich Keane <erich.keane@intel.com>
Thu, 2 Apr 2015 16:36:37 +0000 (16:36 +0000)
stopping.

Added standard mechanism to cleanly interrupt select() calls in linux
network code so that stack shutdown is now immediate.

Among other things this allows for a longer timeout to be used in
select() calls which reduces log spam and increases performance.

Another effect is to clean up unit test performance with a main dev box
gaining more than two orders of magnitude in performance. Stack unit
tests were taking 39 seconds to complete whereas with this change they
complete in less than 1/3rd of a second (1/10th for release builds).

Change-Id: I330b1d7507ab81c5b54677d1ef21434862f63508
Signed-off-by: Jon A. Cruz <jonc@osg.samsung.com>
Reviewed-on: https://gerrit.iotivity.org/gerrit/635
Tested-by: jenkins-iotivity <jenkins-iotivity@opendaylight.org>
Reviewed-by: Thiago Macieira <thiago.macieira@intel.com>
Reviewed-by: Joseph Morrow <joseph.l.morrow@intel.com>
Reviewed-by: Erich Keane <erich.keane@intel.com>
resource/csdk/connectivity/src/ethernet_adapter/linux/caethernetserver.c
resource/csdk/connectivity/src/wifi_adapter/linux/cawifiserver.c

index 0b248f2..699334e 100644 (file)
 #include "umutex.h"
 #include "oic_malloc.h"
 
+// TODO g_stopSecureUnicast is set but never used. The three groups of
+// globals should be combined into three instances of a common struct.
+
+#define PIPE_READ_FD 0
+
+#define PIPE_WRITE_FD 1
+
 /**
  * @def ETHERNET_SERVER_TAG
  * @brief Logging tag for module name
@@ -99,6 +106,11 @@ static u_mutex g_mutexUnicastServer = NULL;
 static bool g_stopUnicast = false;
 
 /**
+ * Handle to interrupt unicast server for stopping, etc.
+ */
+static int g_unicastTriggerFD = -1;
+
+/**
  * @var g_multicastServerSocketFD
  * @brief socket descriptor for multicast server
  */
@@ -116,6 +128,11 @@ static u_mutex g_mutexMulticastServer = NULL;
  */
 static bool g_stopMulticast = false;
 
+/**
+ * Handle to interrupt multicast server for stopping, etc.
+ */
+static int g_multicastTriggerFD = -1;
+
 #ifdef __WITH_DTLS__
 /**
  * @var g_secureUnicastServerSocketFD
@@ -172,10 +189,33 @@ static CAEthernetExceptionCallback g_exceptionCallback = NULL;
 typedef struct
 {
     bool *stopFlag;
+    int stopFd;
     int32_t socket_fd;
     CAAdapterServerType_t type;
 } CAAdapterReceiveThreadContext_t;
 
+/**
+ * Creates a non-blocking pipe.
+ *
+ * Creates a pipe with two file descriptors then sets both to be
+ * non-blocking. If an error occurs setting flags, any created handles
+ * will be closed and set to -1.
+ *
+ * @param pipefd array of two ints to store the pipe handles in.
+ *
+ * @return 0 on success, -1 otherwise.
+ */
+static int createNonblockingPipe(int pipefd[2]);
+
+/**
+ * Sets the given file descriptor to be non-blocking.
+ *
+ * @param fd the file descriptor to make non-blocking.
+ *
+ * @return 0 on success, -1 otherwise.
+ */
+static int setNonblocking(int fd);
+
 static void CAReceiveHandler(void *data)
 {
     OIC_LOG(DEBUG, ETHERNET_SERVER_TAG, "IN");
@@ -190,13 +230,19 @@ static void CAReceiveHandler(void *data)
 
     while (true != *(ctx->stopFlag))
     {
-        timeout.tv_sec = 1;
+        // safe to set longer value as select can be interrupted.
+        timeout.tv_sec = 10;
         timeout.tv_usec = 0;
 
         FD_ZERO(&reads);
         FD_SET(ctx->socket_fd, &reads);
+        int highest = (ctx->stopFd > ctx->socket_fd) ? ctx->stopFd : ctx->socket_fd;
+        if (ctx->stopFd != -1)
+        {
+            FD_SET(ctx->stopFd, &reads);
+        }
 
-        int32_t ret = select(ctx->socket_fd + 1, &reads, NULL, NULL, &timeout);
+        int32_t ret = select(highest + 1, &reads, NULL, NULL, &timeout);
         if (*(ctx->stopFlag) == true)
         {
             OIC_LOG_V(DEBUG, ETHERNET_SERVER_TAG,
@@ -210,6 +256,13 @@ static void CAReceiveHandler(void *data)
             continue;
         }
 
+        if ((ctx->stopFd != -1) && FD_ISSET(ctx->stopFd, &reads))
+        {
+            // Doesn't matter at this point what happens (errors can be
+            // ignored). Just drain some data if this code ever gets hit.
+            recv(ctx->stopFd, recvBuffer, sizeof(recvBuffer), MSG_DONTWAIT);
+        }
+
         if (!FD_ISSET(ctx->socket_fd, &reads))
         {
             continue;
@@ -240,6 +293,10 @@ static void CAReceiveHandler(void *data)
             {
                 g_exceptionCallback(ctx->type);
             }
+            if (ctx->stopFd != -1)
+            {
+                close(ctx->stopFd);
+            }
             OICFree(ctx);
             ctx = NULL;
             return;
@@ -299,11 +356,20 @@ static void CAReceiveHandler(void *data)
             default:
                 // Should never occur
                 OIC_LOG_V(DEBUG, ETHERNET_SERVER_TAG, "Invalid server type");
+                if (ctx->stopFd != -1)
+                {
+                    close(ctx->stopFd);
+                }
                 OICFree(ctx);
                 ctx = NULL;
                 return;
         }
     }
+
+    if (ctx->stopFd != -1)
+    {
+        close(ctx->stopFd);
+    }
     OICFree(ctx);
     ctx = NULL;
 
@@ -325,11 +391,8 @@ static CAResult_t CACreateSocket(int32_t *socketFD, const char *localIp, uint16_
     }
 
     // Make the socket non-blocking
-    if (-1 == fcntl(sock, F_SETFL, O_NONBLOCK))
+    if (-1 == setNonblocking(sock))
     {
-        OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG, "Failed to set non-block mode, Error code: %s",
-                  strerror(errno));
-
         close(sock);
         return CA_STATUS_FAILED;
     }
@@ -462,12 +525,31 @@ static CAResult_t CAStartUnicastServer(const char *localAddress, uint16_t *port,
         return CA_MEMORY_ALLOC_FAILED;
     }
 
+    int pipefd[2] = {-1, -1};
+    if (createNonblockingPipe(pipefd) != 0)
+    {
+        OIC_LOG(ERROR, ETHERNET_SERVER_TAG, "Failed to create pipe");
+        OICFree(ctx);
+        close(*serverFD);
+        *serverFD = -1;
+        return CA_STATUS_FAILED;
+    }
+
+    g_unicastTriggerFD = pipefd[PIPE_WRITE_FD]; // The write end of the pipe
+
     ctx->stopFlag = &g_stopUnicast;
+    ctx->stopFd = pipefd[PIPE_READ_FD]; // The read end of the pipe
     ctx->socket_fd = *serverFD;
     ctx->type = isSecured ? CA_SECURED_UNICAST_SERVER : CA_UNICAST_SERVER;
     if (CA_STATUS_OK != u_thread_pool_add_task(g_threadPool, CAReceiveHandler, (void *)ctx))
     {
         OIC_LOG(ERROR, ETHERNET_SERVER_TAG, "Failed to create read thread!");
+
+        close(g_unicastTriggerFD);
+        g_unicastTriggerFD = -1;
+        close(ctx->stopFd);
+        ctx->stopFd = -1;
+
         OICFree(ctx);
         close(*serverFD);
         *serverFD = -1;
@@ -722,7 +804,20 @@ CAResult_t CAEthernetStartMulticastServer(const char *localAddress,
         return CA_MEMORY_ALLOC_FAILED;
     }
 
+    int pipefd[2] = {-1, -1};
+    if (createNonblockingPipe(pipefd) != 0)
+    {
+        OIC_LOG(ERROR, ETHERNET_SERVER_TAG, "Failed to create pipe");
+        OICFree(ctx);
+        close(g_multicastServerSocketFD);
+        g_multicastServerSocketFD = -1;
+        return CA_STATUS_FAILED;
+    }
+
+    g_multicastTriggerFD = pipefd[PIPE_WRITE_FD]; // The write end of the pipe
+
     ctx->stopFlag = &g_stopMulticast;
+    ctx->stopFd = pipefd[PIPE_READ_FD]; // The read end of the pipe
     ctx->socket_fd = g_multicastServerSocketFD;
     ctx->type = CA_MULTICAST_SERVER;
 
@@ -731,6 +826,11 @@ CAResult_t CAEthernetStartMulticastServer(const char *localAddress,
     {
         OIC_LOG(ERROR, ETHERNET_SERVER_TAG, "thread_pool_add_task failed!");
 
+        close(g_multicastTriggerFD);
+        g_multicastTriggerFD = -1;
+        close(ctx->stopFd);
+        ctx->stopFd = -1;
+
         close(g_multicastServerSocketFD);
         g_multicastServerSocketFD = -1;
         g_stopMulticast = true;
@@ -752,7 +852,19 @@ CAResult_t CAEthernetStopUnicastServer()
 
     u_mutex_lock(g_mutexUnicastServer);
     g_stopUnicast = true;
+    if (g_unicastTriggerFD != -1)
+    {
+        if (write(g_unicastTriggerFD, "X", 1) == -1)
+        {
+            OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG,
+                      "Failed to write to trigger, Error code: %s",
+                      strerror(errno));
+        }
+        close(g_unicastTriggerFD);
+        g_unicastTriggerFD = -1;
+    }
     CAResult_t ret = CACloseSocket(&g_unicastServerSocketFD);
+    g_unicastServerSocketFD = -1;
     u_mutex_unlock(g_mutexUnicastServer);
 
     OIC_LOG_V(INFO, ETHERNET_SERVER_TAG, "Unicast server stopped [%d]", ret);
@@ -766,6 +878,17 @@ CAResult_t CAEthernetStopSecureUnicastServer()
 
     u_mutex_lock(g_mutexSecureUnicastServer);
     g_stopSecureUnicast = true;
+    if (g_unicastTriggerFD != -1)
+    {
+        if (write(g_unicastTriggerFD, "X", 1) == -1)
+        {
+            OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG,
+                      "Failed to write to trigger, Error code: %s",
+                      strerror(errno));
+        }
+        close(g_unicastTriggerFD);
+        g_unicastTriggerFD = -1;
+    }
     CAResult_t ret = CACloseSocket(&g_secureUnicastServerSocketFD);
     u_mutex_unlock(g_mutexSecureUnicastServer);
 
@@ -788,6 +911,17 @@ CAResult_t CAEthernetStopMulticastServer(void)
     }
 
     g_stopMulticast = true;
+    if (g_multicastTriggerFD != -1)
+    {
+        if (write(g_multicastTriggerFD, "X", 1) == -1)
+        {
+            OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG,
+                      "Failed to write to trigger, Error code: %s",
+                      strerror(errno));
+        }
+        close(g_multicastTriggerFD);
+        g_multicastTriggerFD = -1;
+    }
 
     // leave the group after you are done
     if (-1 == setsockopt(g_multicastServerSocketFD, IPPROTO_IP, IP_DROP_MEMBERSHIP,
@@ -851,3 +985,61 @@ void CAEthernetSetExceptionCallback(CAEthernetExceptionCallback callback)
     OIC_LOG(DEBUG, ETHERNET_SERVER_TAG, "OUT");
 }
 
+int setNonblocking(int fd)
+{
+    int rc = fcntl(fd, F_GETFL);
+    if (rc == -1)
+    {
+        OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG,
+                  "Failed to get existing flags, Error code: %s",
+                  strerror(errno));
+    }
+    else
+    {
+        rc = fcntl(fd, F_SETFL, rc | O_NONBLOCK);
+        if (rc == -1)
+        {
+            OIC_LOG_V(ERROR, ETHERNET_SERVER_TAG,
+                      "Failed to set non-blocking mode, Error code: %s",
+                      strerror(errno));
+        }
+    }
+
+    return rc;
+}
+
+int createNonblockingPipe(int pipefd[2])
+{
+    int rc = -1;
+    if (pipefd)
+    {
+        pipefd[PIPE_READ_FD] = -1;
+        pipefd[PIPE_WRITE_FD] = -1;
+        rc = pipe(pipefd);
+
+        if (rc != -1)
+        {
+            rc = setNonblocking(pipefd[PIPE_READ_FD]);
+        }
+
+        if (rc != -1)
+        {
+            rc = setNonblocking(pipefd[PIPE_WRITE_FD]);
+        }
+
+        if (rc == -1)
+        {
+            if (pipefd[PIPE_READ_FD] != -1)
+            {
+                close(pipefd[PIPE_READ_FD]);
+                pipefd[PIPE_READ_FD] = -1;
+            }
+            if (pipefd[PIPE_WRITE_FD] != -1)
+            {
+                close(pipefd[PIPE_WRITE_FD]);
+                pipefd[PIPE_WRITE_FD]= -1;
+            }
+        }
+    }
+    return rc;
+}
index af86401..24189ae 100644 (file)
 #include "umutex.h"
 #include "oic_malloc.h"
 
+// TODO g_stopSecureUnicast is set but never used. The three groups of
+// globals should be combined into three instances of a common struct.
+
+#define PIPE_READ_FD 0
+
+#define PIPE_WRITE_FD 1
+
 /**
  * @def WIFI_SERVER_TAG
  * @brief Logging tag for module name
@@ -98,6 +105,11 @@ static u_mutex g_mutexUnicastServer = NULL;
 static bool g_stopUnicast = false;
 
 /**
+ * Handle to interrupt unicast server for stopping, etc.
+ */
+static int g_unicastTriggerFD = -1;
+
+/**
  * @var g_multicastServerSocketFD
  * @brief socket descriptor for multicast server
  */
@@ -115,6 +127,11 @@ static u_mutex g_mutexMulticastServer = NULL;
  */
 static bool g_stopMulticast = false;
 
+/**
+ * Handle to interrupt multicast server for stopping, etc.
+ */
+static int g_multicastTriggerFD = -1;
+
 #ifdef __WITH_DTLS__
 /**
  * @var g_secureUnicastServerSocketFD
@@ -171,10 +188,33 @@ static CAWiFiExceptionCallback g_exceptionCallback = NULL;
 typedef struct
 {
     bool *stopFlag;
+    int stopFd;
     int32_t socket_fd;
     CAAdapterServerType_t type;
 } CAAdapterReceiveThreadContext_t;
 
+/**
+ * Creates a non-blocking pipe.
+ *
+ * Creates a pipe with two file descriptors then sets both to be
+ * non-blocking. If an error occurs setting flags, any created handles
+ * will be closed and set to -1.
+ *
+ * @param pipefd array of two ints to store the pipe handles in.
+ *
+ * @return 0 on success, -1 otherwise.
+ */
+static int createNonblockingPipe(int pipefd[2]);
+
+/**
+ * Sets the given file descriptor to be non-blocking.
+ *
+ * @param fd the file descriptor to make non-blocking.
+ *
+ * @return 0 on success, -1 otherwise.
+ */
+static int setNonblocking(int fd);
+
 static void CAReceiveHandler(void *data)
 {
     OIC_LOG(DEBUG, WIFI_SERVER_TAG, "IN");
@@ -189,13 +229,19 @@ static void CAReceiveHandler(void *data)
 
     while (true != *(ctx->stopFlag))
     {
-        timeout.tv_sec = 1;
+        // safe to set longer value as select can be interrupted.
+        timeout.tv_sec = 10;
         timeout.tv_usec = 0;
 
         FD_ZERO(&reads);
         FD_SET(ctx->socket_fd, &reads);
+        int highest = (ctx->stopFd > ctx->socket_fd) ? ctx->stopFd : ctx->socket_fd;
+        if (ctx->stopFd != -1)
+        {
+            FD_SET(ctx->stopFd, &reads);
+        }
 
-        int32_t ret = select(ctx->socket_fd + 1, &reads, NULL, NULL, &timeout);
+        int32_t ret = select(highest + 1, &reads, NULL, NULL, &timeout);
         if (*(ctx->stopFlag) == true)
         {
             OIC_LOG_V(DEBUG, WIFI_SERVER_TAG, "Stop request received for [%d] server", ctx->type);
@@ -206,6 +252,14 @@ static void CAReceiveHandler(void *data)
             OIC_LOG_V(FATAL, WIFI_SERVER_TAG, "select returned error %s", strerror(errno));
             continue;
         }
+
+        if ((ctx->stopFd != -1) && FD_ISSET(ctx->stopFd, &reads))
+        {
+            // Doesn't matter at this point what happens (errors can be
+            // ignored). Just drain some data if this code ever gets hit.
+            recv(ctx->stopFd, recvBuffer, sizeof(recvBuffer), MSG_DONTWAIT);
+        }
+
         if (!FD_ISSET(ctx->socket_fd, &reads))
         {
             continue;
@@ -233,6 +287,10 @@ static void CAReceiveHandler(void *data)
             {
                 g_exceptionCallback(ctx->type);
             }
+            if (ctx->stopFd != -1)
+            {
+                close(ctx->stopFd);
+            }
             OICFree(ctx);
             return;
         }
@@ -287,13 +345,22 @@ static void CAReceiveHandler(void *data)
                 break;
 #endif //__WITH_DTLS__
             default:
-                // Should never occur\r                OIC_LOG_V(DEBUG, WIFI_SERVER_TAG, "Invalid server type");
+                // Should never occur
+                OIC_LOG_V(DEBUG, WIFI_SERVER_TAG, "Invalid server type");
+                if (ctx->stopFd != -1)
+                {
+                    close(ctx->stopFd);
+                }
                 OICFree(ctx);
                 return;
         }
     }
 
     // free context
+    if (ctx->stopFd != -1)
+    {
+        close(ctx->stopFd);
+    }
     OICFree(ctx);
 
     OIC_LOG(DEBUG, WIFI_SERVER_TAG, "OUT");
@@ -312,11 +379,8 @@ static CAResult_t CAWiFiCreateSocket(int32_t *socketFD, const char *localIp, uin
     }
 
     // Make the socket non-blocking
-    if (-1 == fcntl(sock, F_SETFL, O_NONBLOCK))
+    if (-1 == setNonblocking(sock))
     {
-        OIC_LOG_V(ERROR, WIFI_SERVER_TAG, "Failed to set non-block mode, Error code: %s",
-                strerror(errno));
-
         close(sock);
         return CA_STATUS_FAILED;
     }
@@ -456,12 +520,31 @@ static CAResult_t CAStartUnicastServer(const char *localAddress, uint16_t *port,
         return CA_MEMORY_ALLOC_FAILED;
     }
 
+    int pipefd[2] = {-1, -1};
+    if (createNonblockingPipe(pipefd) != 0)
+    {
+        OIC_LOG(ERROR, WIFI_SERVER_TAG, "Failed to create pipe");
+        OICFree(ctx);
+        close(*serverFD);
+        *serverFD = -1;
+        return CA_STATUS_FAILED;
+    }
+
+    g_unicastTriggerFD = pipefd[PIPE_WRITE_FD]; // The write end of the pipe
+
     ctx->stopFlag = &g_stopUnicast;
+    ctx->stopFd = pipefd[PIPE_READ_FD]; // The read end of the pipe
     ctx->socket_fd = *serverFD;
     ctx->type = isSecured ? CA_SECURED_UNICAST_SERVER : CA_UNICAST_SERVER;
     if (CA_STATUS_OK != u_thread_pool_add_task(g_threadPool, CAReceiveHandler, (void *) ctx))
     {
         OIC_LOG(ERROR, WIFI_SERVER_TAG, "Failed to create read thread!");
+
+        close(g_unicastTriggerFD);
+        g_unicastTriggerFD = -1;
+        close(ctx->stopFd);
+        ctx->stopFd = -1;
+
         OICFree((void *) ctx);
         close(*serverFD);
         return CA_STATUS_FAILED;
@@ -709,7 +792,20 @@ CAResult_t CAWiFiStartMulticastServer(const char *localAddress, const char *mult
         return CA_MEMORY_ALLOC_FAILED;
     }
 
+    int pipefd[2] = {-1, -1};
+    if (createNonblockingPipe(pipefd) != 0)
+    {
+        OIC_LOG(ERROR, WIFI_SERVER_TAG, "Failed to create pipe");
+        OICFree(ctx);
+        close(g_multicastServerSocketFD);
+        g_multicastServerSocketFD = -1;
+        return CA_STATUS_FAILED;
+    }
+
+    g_multicastTriggerFD = pipefd[PIPE_WRITE_FD]; // The write end of the pipe
+
     ctx->stopFlag = &g_stopMulticast;
+    ctx->stopFd = pipefd[PIPE_READ_FD]; // The read end of the pipe
     ctx->socket_fd = g_multicastServerSocketFD;
     ctx->type = CA_MULTICAST_SERVER;
 
@@ -718,6 +814,11 @@ CAResult_t CAWiFiStartMulticastServer(const char *localAddress, const char *mult
     {
         OIC_LOG(ERROR, WIFI_SERVER_TAG, "thread_pool_add_task failed!");
 
+        close(g_multicastTriggerFD);
+        g_multicastTriggerFD = -1;
+        close(ctx->stopFd);
+        ctx->stopFd = -1;
+
         close(g_multicastServerSocketFD);
         g_multicastServerSocketFD = -1;
         g_stopMulticast = true;
@@ -739,6 +840,17 @@ CAResult_t CAWiFiStopUnicastServer()
 
     u_mutex_lock(g_mutexUnicastServer);
     g_stopUnicast = true;
+    if (g_unicastTriggerFD != -1)
+    {
+        if (write(g_unicastTriggerFD, "X", 1) == -1)
+        {
+            OIC_LOG_V(ERROR, WIFI_SERVER_TAG,
+                      "Failed to write to trigger, Error code: %s",
+                      strerror(errno));
+        }
+        close(g_unicastTriggerFD);
+        g_unicastTriggerFD = -1;
+    }
     CAResult_t ret = CAWiFiCloseSocket(&g_unicastServerSocketFD);
     u_mutex_unlock(g_mutexUnicastServer);
 
@@ -753,6 +865,17 @@ CAResult_t CAWiFiStopSecureUnicastServer()
 
     u_mutex_lock(g_mutexSecureUnicastServer);
     g_stopSecureUnicast = true;
+    if (g_unicastTriggerFD != -1)
+    {
+        if (write(g_unicastTriggerFD, "X", 1) == -1)
+        {
+            OIC_LOG_V(ERROR, WIFI_SERVER_TAG,
+                      "Failed to write to trigger, Error code: %s",
+                      strerror(errno));
+        }
+        close(g_unicastTriggerFD);
+        g_unicastTriggerFD = -1;
+    }
     CAResult_t ret = CAWiFiCloseSocket(&g_secureUnicastServerSocketFD);
     u_mutex_unlock(g_mutexSecureUnicastServer);
 
@@ -775,6 +898,17 @@ CAResult_t CAWiFiStopMulticastServer(void)
     }
 
     g_stopMulticast = true;
+    if (g_multicastTriggerFD != -1)
+    {
+        if (write(g_multicastTriggerFD, "X", 1) == -1)
+        {
+            OIC_LOG_V(ERROR, WIFI_SERVER_TAG,
+                      "Failed to write to trigger, Error code: %s",
+                      strerror(errno));
+        }
+        close(g_multicastTriggerFD);
+        g_multicastTriggerFD = -1;
+    }
 
     // leave the group after you are done
     if (-1
@@ -838,3 +972,61 @@ void CAWiFiSetExceptionCallback(CAWiFiExceptionCallback callback)
     g_exceptionCallback = callback;
 }
 
+int setNonblocking(int fd)
+{
+    int rc = fcntl(fd, F_GETFL);
+    if (rc == -1)
+    {
+        OIC_LOG_V(ERROR, WIFI_SERVER_TAG,
+                  "Failed to get existing flags, Error code: %s",
+                  strerror(errno));
+    }
+    else
+    {
+        rc = fcntl(fd, F_SETFL, rc | O_NONBLOCK);
+        if (rc == -1)
+        {
+            OIC_LOG_V(ERROR, WIFI_SERVER_TAG,
+                      "Failed to set non-blocking mode, Error code: %s",
+                      strerror(errno));
+        }
+    }
+
+    return rc;
+}
+
+int createNonblockingPipe(int pipefd[2])
+{
+    int rc = -1;
+    if (pipefd)
+    {
+        pipefd[PIPE_READ_FD] = -1;
+        pipefd[PIPE_WRITE_FD] = -1;
+        rc = pipe(pipefd);
+
+        if (rc != -1)
+        {
+            rc = setNonblocking(pipefd[PIPE_READ_FD]);
+        }
+
+        if (rc != -1)
+        {
+            rc = setNonblocking(pipefd[PIPE_WRITE_FD]);
+        }
+
+        if (rc == -1)
+        {
+            if (pipefd[PIPE_READ_FD] != -1)
+            {
+                close(pipefd[PIPE_READ_FD]);
+                pipefd[PIPE_READ_FD] = -1;
+            }
+            if (pipefd[PIPE_WRITE_FD] != -1)
+            {
+                close(pipefd[PIPE_WRITE_FD]);
+                pipefd[PIPE_WRITE_FD]= -1;
+            }
+        }
+    }
+    return rc;
+}