{
FD_SET(caglobals.tcp.shutdownFds[0], &readFds);
}
+ if (-1 != caglobals.tcp.connectionFds[0])
+ {
+ FD_SET(caglobals.tcp.connectionFds[0], &readFds);
+ }
uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
for (size_t i = 0; i < length; i++)
CAAcceptConnection();
return;
}
+ else if (-1 != caglobals.tcp.connectionFds[0] &&
+ FD_ISSET(caglobals.tcp.connectionFds[0], readFds))
+ {
+ // new connection was created from remote device.
+ // exit the function to update read file descriptor.
+ char buf[MAX_ADDR_STR_SIZE_CA] = {0};
+ ssize_t len = read(caglobals.tcp.connectionFds[0], buf, sizeof (buf));
+ if (-1 == len)
+ {
+ return;
+ }
+ OIC_LOG_V(DEBUG, TAG, "Received new connection event with [%s]", buf);
+ FD_CLR(caglobals.tcp.connectionFds[0], readFds);
+ return;
+ }
else
{
uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
return;
}
+static void CAWakeUpForReadFdsUpdate(const char *host)
+{
+ if (caglobals.tcp.connectionFds[1] != -1)
+ {
+ ssize_t len = 0;
+ do
+ {
+ len = write(caglobals.tcp.connectionFds[1], host, strlen(host));
+ } while ((len == -1) && (errno == EINTR));
+
+ if ((len == -1) && (errno != EINTR) && (errno != EPIPE))
+ {
+ OIC_LOG_V(DEBUG, TAG, "write failed: %s", strerror(errno));
+ }
+ }
+}
+
static int CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem)
{
// create tcp socket
if (0 == ret)
{
OIC_LOG(DEBUG, TAG, "connect socket success");
- }
- else if (EINPROGRESS == errno)
- {
- OIC_LOG(DEBUG, TAG, "EINPROGRESS");
- int error = 0;
- socklen_t len = sizeof(error);
- if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
- {
- OIC_LOG(ERROR, TAG, "getsockopt() error");
- goto exit;
- }
-
- if (error)
- {
- if (ECONNREFUSED == error)
- {
- OIC_LOG(ERROR, TAG, "connection refused");
- goto exit;
- }
- OIC_LOG(ERROR, TAG, "failed to connect socket");
- goto exit;
- }
- OIC_LOG(DEBUG, TAG, "connect socket success");
+ CAWakeUpForReadFdsUpdate(svritem->sep.endpoint.addr);
}
else
{
return CA_STATUS_FAILED;
}
-static void CAInitializePipe()
+static void CAInitializePipe(int *fds)
{
- int ret = pipe(caglobals.tcp.shutdownFds);
+ int ret = pipe(fds);
if (-1 != ret)
{
- ret = fcntl(caglobals.tcp.shutdownFds[0], F_GETFD);
+ ret = fcntl(fds[0], F_GETFD);
if (-1 != ret)
{
- ret = fcntl(caglobals.tcp.shutdownFds[0], F_SETFD, ret|FD_CLOEXEC);
+ ret = fcntl(fds[0], F_SETFD, ret|FD_CLOEXEC);
}
if (-1 != ret)
{
- ret = fcntl(caglobals.tcp.shutdownFds[1], F_GETFD);
+ ret = fcntl(fds[1], F_GETFD);
}
if (-1 != ret)
{
- ret = fcntl(caglobals.tcp.shutdownFds[1], F_SETFD, ret|FD_CLOEXEC);
+ ret = fcntl(fds[1], F_SETFD, ret|FD_CLOEXEC);
}
if (-1 == ret)
{
- close(caglobals.tcp.shutdownFds[1]);
- close(caglobals.tcp.shutdownFds[0]);
+ close(fds[1]);
+ close(fds[0]);
- caglobals.tcp.shutdownFds[0] = -1;
- caglobals.tcp.shutdownFds[1] = -1;
+ fds[0] = -1;
+ fds[1] = -1;
OIC_LOG_V(ERROR, TAG, "pipe failed: %s", strerror(errno));
}
}
// create pipe for fast shutdown
- CAInitializePipe();
+ CAInitializePipe(caglobals.tcp.shutdownFds);
CHECKFD(caglobals.tcp.shutdownFds[0]);
CHECKFD(caglobals.tcp.shutdownFds[1]);
+ // create pipe for connection event
+ CAInitializePipe(caglobals.tcp.connectionFds);
+ CHECKFD(caglobals.tcp.connectionFds[0]);
+ CHECKFD(caglobals.tcp.connectionFds[1]);
+
caglobals.tcp.terminate = false;
res = ca_thread_pool_add_task(threadPool, CAReceiveHandler, NULL);
if (CA_STATUS_OK != res)
// receive thread will stop immediately
}
+ if (caglobals.tcp.connectionFds[1] != -1)
+ {
+ close(caglobals.tcp.connectionFds[1]);
+ }
+
if (caglobals.tcp.started)
{
ca_cond_wait(g_condObjectList, g_mutexObjectList);