bool isMulticast;
} CATCPData;
-#define CA_TCP_TIMEOUT 1000
-
#define CA_TCP_LISTEN_BACKLOG 3
+#define CA_TCP_SELECT_TIMEOUT 10
+
/**
* Queue handle for Send Data.
*/
CAResult_t CATCPInitializeQueueHandles()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
// Check if the message queue is already initialized
if (g_sendQueueHandle)
{
return CA_STATUS_FAILED;
}
- OIC_LOG(DEBUG, TAG, "OUT");
return CA_STATUS_OK;
}
void CATCPDeinitializeQueueHandles()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
CAQueueingThreadDestroy(g_sendQueueHandle);
OICFree(g_sendQueueHandle);
g_sendQueueHandle = NULL;
-
- OIC_LOG(DEBUG, TAG, "OUT");
}
void CATCPConnectionStateCB(const char *ipAddress, CANetworkStatus_t status)
{
(void)ipAddress;
(void)status;
- OIC_LOG(DEBUG, TAG, "IN");
}
void CATCPPacketReceivedCB(const CAEndpoint_t *endpoint, const void *data,
uint32_t dataLength)
{
- OIC_LOG(DEBUG, TAG, "IN");
-
- VERIFY_NON_NULL_VOID(endpoint, TAG, "ipAddress is NULL");
+ VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint is NULL");
VERIFY_NON_NULL_VOID(data, TAG, "data is NULL");
OIC_LOG_V(DEBUG, TAG, "Address: %s, port:%d", endpoint->addr, endpoint->port);
{
g_networkPacketCallback(endpoint, data, dataLength);
}
- OIC_LOG(DEBUG, TAG, "OUT");
}
void CATCPErrorHandler(const CAEndpoint_t *endpoint, const void *data,
uint32_t dataLength, CAResult_t result)
{
- OIC_LOG(DEBUG, TAG, "IN");
-
VERIFY_NON_NULL_VOID(endpoint, TAG, "endpoint is NULL");
-
VERIFY_NON_NULL_VOID(data, TAG, "data is NULL");
if (g_errorCallback)
{
g_errorCallback(endpoint, data, dataLength, result);
}
-
- OIC_LOG(DEBUG, TAG, "OUT");
}
static void CAInitializeTCPGlobals()
{
- caglobals.tcp.selectTimeout = CA_TCP_TIMEOUT;
+ caglobals.tcp.selectTimeout = CA_TCP_SELECT_TIMEOUT;
caglobals.tcp.listenBacklog = CA_TCP_LISTEN_BACKLOG;
caglobals.tcp.svrlist = NULL;
CAResult_t CAStartTCP()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
if (CA_STATUS_OK != CATCPInitializeQueueHandles())
{
OIC_LOG(ERROR, TAG, "Failed to Initialize Queue Handle");
return ret;
}
- OIC_LOG(DEBUG, TAG, "OUT");
return CA_STATUS_OK;
}
CAResult_t CAStartTCPListeningServer()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
- OIC_LOG(DEBUG, TAG, "OUT");
return CA_STATUS_OK;
}
CAResult_t CAStopTCPListeningServer()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
- OIC_LOG(DEBUG, TAG, "OUT");
return CA_STATUS_OK;
}
CAResult_t CAStartTCPDiscoveryServer()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
- OIC_LOG(DEBUG, TAG, "OUT");
return CA_STATUS_OK;
}
static size_t CAQueueTCPData(bool isMulticast, const CAEndpoint_t *endpoint,
const void *data, size_t dataLength)
{
- OIC_LOG(DEBUG, TAG, "IN");
-
- VERIFY_NON_NULL_RET(endpoint, TAG, "remoteEndpoint", -1);
+ VERIFY_NON_NULL_RET(endpoint, TAG, "endpoint", -1);
VERIFY_NON_NULL_RET(data, TAG, "data", -1);
if (0 == dataLength)
VERIFY_NON_NULL_RET(g_sendQueueHandle, TAG, "sendQueueHandle", -1);
// Create TCPData to add to queue
- CATCPData *TCPData = CACreateTCPData(endpoint, data, dataLength, isMulticast);
- if (!TCPData)
+ CATCPData *tcpData = CACreateTCPData(endpoint, data, dataLength, isMulticast);
+ if (!tcpData)
{
OIC_LOG(ERROR, TAG, "Failed to create ipData!");
return -1;
}
// Add message to send queue
- CAQueueingThreadAddData(g_sendQueueHandle, TCPData, sizeof(CATCPData));
+ CAQueueingThreadAddData(g_sendQueueHandle, tcpData, sizeof(CATCPData));
- OIC_LOG(DEBUG, TAG, "OUT");
return dataLength;
}
int32_t CASendTCPUnicastData(const CAEndpoint_t *endpoint,
const void *data, uint32_t dataLength)
{
- OIC_LOG(DEBUG, TAG, "IN");
return CAQueueTCPData(false, endpoint, data, dataLength);
}
int32_t CASendTCPMulticastData(const CAEndpoint_t *endpoint,
const void *data, uint32_t dataLength)
{
- OIC_LOG(DEBUG, TAG, "IN");
return CAQueueTCPData(true, endpoint, data, dataLength);
}
CAResult_t CAReadTCPData()
{
- OIC_LOG(DEBUG, TAG, "IN");
- OIC_LOG(DEBUG, TAG, "OUT");
return CA_STATUS_OK;
}
CAResult_t CAStopTCP()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
if (g_sendQueueHandle && g_sendQueueHandle->threadMutex)
{
CAQueueingThreadStop(g_sendQueueHandle);
}
CATCPDeinitializeQueueHandles();
-
CATCPStopServer();
+ //Re-initializing the Globals to start them again
+ CAInitializeTCPGlobals();
- OIC_LOG(DEBUG, TAG, "OUT");
return CA_STATUS_OK;
}
void CATerminateTCP()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
CATCPSetPacketReceiveCallback(NULL);
CATCPDeinitializeQueueHandles();
-
- OIC_LOG(DEBUG, TAG, "OUT");
}
void CATCPSendDataThread(void *threadData)
{
- OIC_LOG(DEBUG, TAG, "IN");
-
- CATCPData *TCPData = (CATCPData *) threadData;
- if (!TCPData)
+ CATCPData *tcpData = (CATCPData *) threadData;
+ if (!tcpData)
{
OIC_LOG(DEBUG, TAG, "Invalid TCP data!");
return;
}
- if (TCPData->isMulticast)
+ if (tcpData->isMulticast)
{
//Processing for sending multicast
OIC_LOG(DEBUG, TAG, "Send Multicast Data is called, not supported");
else
{
//Processing for sending unicast
- CATCPSendData(TCPData->remoteEndpoint, TCPData->data, TCPData->dataLen, false);
+ CATCPSendData(tcpData->remoteEndpoint, tcpData->data, tcpData->dataLen, false);
}
-
- OIC_LOG(DEBUG, TAG, "OUT");
}
CATCPData *CACreateTCPData(const CAEndpoint_t *remoteEndpoint, const void *data,
size_t dataLength, bool isMulticast)
{
- VERIFY_NON_NULL_RET(data, TAG, "TCPData is NULL", NULL);
+ VERIFY_NON_NULL_RET(remoteEndpoint, TAG, "remoteEndpoint is NULL", NULL);
+ VERIFY_NON_NULL_RET(data, TAG, "data is NULL", NULL);
- CATCPData *TCPData = (CATCPData *) OICMalloc(sizeof(CATCPData));
- if (!TCPData)
+ CATCPData *tcpData = (CATCPData *) OICCalloc(1, sizeof(*tcpData));
+ if (!tcpData)
{
OIC_LOG(ERROR, TAG, "Memory allocation failed!");
return NULL;
}
- TCPData->remoteEndpoint = CACloneEndpoint(remoteEndpoint);
- TCPData->data = (void *) OICMalloc(dataLength);
- if (!TCPData->data)
+ tcpData->remoteEndpoint = CACloneEndpoint(remoteEndpoint);
+ tcpData->data = (void *) OICMalloc(dataLength);
+ if (!tcpData->data)
{
OIC_LOG(ERROR, TAG, "Memory allocation failed!");
- CAFreeTCPData(TCPData);
+ CAFreeTCPData(tcpData);
return NULL;
}
- memcpy(TCPData->data, data, dataLength);
- TCPData->dataLen = dataLength;
+ memcpy(tcpData->data, data, dataLength);
+ tcpData->dataLen = dataLength;
- TCPData->isMulticast = isMulticast;
+ tcpData->isMulticast = isMulticast;
- return TCPData;
+ return tcpData;
}
-void CAFreeTCPData(CATCPData *TCPData)
+void CAFreeTCPData(CATCPData *tcpData)
{
- VERIFY_NON_NULL_VOID(TCPData, TAG, "TCPData is NULL");
+ VERIFY_NON_NULL_VOID(tcpData, TAG, "tcpData is NULL");
- CAFreeEndpoint(TCPData->remoteEndpoint);
- OICFree(TCPData->data);
- OICFree(TCPData);
+ CAFreeEndpoint(tcpData->remoteEndpoint);
+ OICFree(tcpData->data);
+ OICFree(tcpData);
}
void CADataDestroyer(void *data, uint32_t size)
#define TCP_MAX_HEADER_LEN 6
/**
- * Default Thread Counts in TCP adapter
- */
-#define CA_TCP_DEFAULT_THREAD_COUNTS 2
-
-/**
* Accept server file descriptor.
*/
static int g_acceptServerFD = -1;
static ca_cond g_condObjectList = NULL;
/**
- * Maintains the current running thread counts.
- */
-static uint32_t g_threadCounts = CA_TCP_DEFAULT_THREAD_COUNTS;
-
-/**
* Maintains the callback to be notified when data received from remote device.
*/
static CATCPPacketReceivedCallback g_packetReceivedCallback;
static void CATCPDestroyMutex();
static CAResult_t CATCPCreateCond();
static void CATCPDestroyCond();
-static void CAAcceptHandler(void *data);
+static CAResult_t CACreateAcceptSocket();
+static void CAAcceptConnection();
+static void CAFindReadyMessage();
+static void CASelectReturned(fd_set *readFds, int ret);
+static void CAReceiveMessage(int fd);
static void CAReceiveHandler(void *data);
-static CAResult_t CAReceiveMessage();
-static int CASetNonblocking(int fd);
-static int CATCPCreateSocket(int family, CATCPServerInfo_t *TCPServerInfo);
-static size_t CAGetTotalLengthFromHeader(const unsigned char *recvBuffer);
-static void CATCPDisconnectAll();
+static int CATCPCreateSocket(int family, CATCPSessionInfo_t *tcpServerInfo);
+
+#define CHECKFD(FD) \
+ if (FD > caglobals.tcp.maxfd) \
+ caglobals.tcp.maxfd = FD;
static void CATCPDestroyMutex()
{
return CA_STATUS_OK;
}
-static void CATCPDisconnectAll()
-{
- OIC_LOG(DEBUG, TAG, "IN");
-
- ca_mutex_lock(g_mutexObjectList);
- uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
-
- CATCPServerInfo_t *svritem = NULL;
- for (size_t i = 0; i < length; i++)
- {
- svritem = (CATCPServerInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
- if (svritem && svritem->u4tcp.fd >= 0)
- {
- shutdown(svritem->u4tcp.fd, SHUT_RDWR);
- close(svritem->u4tcp.fd);
- }
- }
- u_arraylist_destroy(caglobals.tcp.svrlist);
- caglobals.tcp.svrlist = NULL;
- ca_mutex_unlock(g_mutexObjectList);
-
- OIC_LOG(DEBUG, TAG, "OUT");
-}
-
static void CAReceiveHandler(void *data)
{
(void)data;
while (!caglobals.tcp.terminate)
{
- CAReceiveMessage();
+ CAFindReadyMessage();
}
ca_mutex_lock(g_mutexObjectList);
- // notify the thread
- g_threadCounts--;
- if (!g_threadCounts)
- {
- ca_cond_signal(g_condObjectList);
- }
+ ca_cond_signal(g_condObjectList);
ca_mutex_unlock(g_mutexObjectList);
OIC_LOG(DEBUG, TAG, "OUT - CAReceiveHandler");
}
-static size_t CAGetTotalLengthFromHeader(const unsigned char *recvBuffer)
+static void CAFindReadyMessage()
{
- OIC_LOG(DEBUG, TAG, "IN - CAGetTotalLengthFromHeader");
-
- coap_transport_type transport = coap_get_tcp_header_type_from_initbyte(
- ((unsigned char *)recvBuffer)[0] >> 4);
- size_t optPaylaodLen = coap_get_length_from_header((unsigned char *)recvBuffer,
- transport);
- size_t headerLen = coap_get_tcp_header_length((unsigned char *)recvBuffer);
+ fd_set readFds;
+ struct timeval timeout = { .tv_sec = caglobals.tcp.selectTimeout };
- OIC_LOG_V(DEBUG, TAG, "option/paylaod length [%zu]", optPaylaodLen);
- OIC_LOG_V(DEBUG, TAG, "header length [%zu]", headerLen);
- OIC_LOG_V(DEBUG, TAG, "total data length [%zu]", headerLen + optPaylaodLen);
+ FD_ZERO(&readFds);
- OIC_LOG(DEBUG, TAG, "OUT - CAGetTotalLengthFromHeader");
- return headerLen + optPaylaodLen;
-}
+ if (-1 != g_acceptServerFD)
+ {
+ FD_SET(g_acceptServerFD, &readFds);
+ }
+ if (-1 != caglobals.tcp.shutdownFds[0])
+ {
+ FD_SET(caglobals.tcp.shutdownFds[0], &readFds);
+ }
-static CAResult_t CAReceiveMessage()
-{
uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
-
- size_t i = 0;
- unsigned char *recvBuffer = NULL;
- CATCPServerInfo_t *svritem = NULL;
- for (i = 0; i < length; i++)
+ for (size_t i = 0; i < length; i++)
{
- svritem = (CATCPServerInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
- if (svritem->u4tcp.fd < 0)
+ CATCPSessionInfo_t *svritem =
+ (CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
+ if (svritem && 0 <= svritem->fd)
{
- continue;
+ FD_SET(svritem->fd, &readFds);
}
+ }
- size_t bufSize = TCP_MAX_HEADER_LEN;
- recvBuffer = (unsigned char *) OICCalloc(1, bufSize);
- if (!recvBuffer)
+ int ret = select(caglobals.tcp.maxfd + 1, &readFds, NULL, NULL, &timeout);
+
+ if (caglobals.tcp.terminate)
+ {
+ OIC_LOG_V(DEBUG, TAG, "Packet receiver Stop request received.");
+ return;
+ }
+ if (0 >= ret)
+ {
+ if (0 > ret)
{
- OIC_LOG(ERROR, TAG, "out of memory");
- goto exit;
+ OIC_LOG_V(FATAL, TAG, "select error %s", strerror(errno));
}
+ return;
+ }
+
+ CASelectReturned(&readFds, ret);
+}
- bool isHeaderChecked = false;
- size_t totalLen = 0;
- size_t totalReceivedLen = 0;
- do
+static void CASelectReturned(fd_set *readFds, int ret)
+{
+ (void)ret;
+
+ if (g_acceptServerFD != -1 && FD_ISSET(g_acceptServerFD, readFds))
+ {
+ CAAcceptConnection();
+ return;
+ }
+ else
+ {
+ uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+ for (size_t i = 0; i < length; i++)
{
- ssize_t recvLen = recv(svritem->u4tcp.fd, recvBuffer + totalReceivedLen,
- bufSize - totalReceivedLen, 0);
- if (recvLen <= 0)
+ CATCPSessionInfo_t *svritem =
+ (CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
+ if (svritem && svritem->fd >= 0)
{
- if(EWOULDBLOCK != errno)
- {
- OIC_LOG_V(ERROR, TAG, "Recvfrom failed %s", strerror(errno));
- goto exit;
- }
- // if received data length is zero, we are breaking loop.
- // because we use non-blocking socket to receive data from remote device.
- if (!totalReceivedLen)
+ if (FD_ISSET(svritem->fd, readFds))
{
- break;
+ CAReceiveMessage(svritem->fd);
+ FD_CLR(svritem->fd, readFds);
}
- continue;
}
+ }
+ }
+}
- totalReceivedLen += recvLen;
- if (!isHeaderChecked && totalReceivedLen)
- {
- coap_transport_type transport = coap_get_tcp_header_type_from_initbyte(
- ((unsigned char *)recvBuffer)[0] >> 4);
- size_t headerLen = coap_get_tcp_header_length_for_transport(transport);
- if (totalReceivedLen >= headerLen)
- {
- // get actual data length from coap over tcp header
- totalLen = CAGetTotalLengthFromHeader((unsigned char *) recvBuffer);
- bufSize = totalLen;
- unsigned char *newBuf = OICRealloc(recvBuffer, bufSize);
- if (NULL == newBuf)
- {
- OIC_LOG(ERROR, TAG, "out of memory");
- goto exit;
- }
- recvBuffer = newBuf;
- isHeaderChecked = true;
- }
- }
- if (totalLen == totalReceivedLen)
- {
- CAEndpoint_t ep = { .adapter = CA_ADAPTER_TCP,
- .port = svritem->u4tcp.port };
- strncpy(ep.addr, svritem->addr, sizeof(ep.addr));
+static void CAAcceptConnection()
+{
+ struct sockaddr_storage clientaddr;
+ socklen_t clientlen = sizeof (struct sockaddr_in);
- if (g_packetReceivedCallback)
- {
- g_packetReceivedCallback(&ep, recvBuffer, totalLen);
- }
- OIC_LOG_V(DEBUG, TAG, "received data len:%zu", totalLen);
- break;
- }
- } while (!totalLen || totalLen > totalReceivedLen);
+ int sockfd = accept(g_acceptServerFD, (struct sockaddr *)&clientaddr,
+ &clientlen);
+ if (-1 != sockfd)
+ {
+ CATCPSessionInfo_t *svritem =
+ (CATCPSessionInfo_t *) OICCalloc(1, sizeof (*svritem));
+ if (!svritem)
+ {
+ OIC_LOG(ERROR, TAG, "Out of memory");
+ close(sockfd);
+ return;
+ }
- OICFree(recvBuffer);
- }
+ svritem->fd = sockfd;
+ CAConvertAddrToName((struct sockaddr_storage *)&clientaddr, clientlen,
+ (char *) &svritem->endpoint.addr, &svritem->endpoint.port);
- return CA_STATUS_OK;
+ ca_mutex_lock(g_mutexObjectList);
+ bool result = u_arraylist_add(caglobals.tcp.svrlist, svritem);
+ if (!result)
+ {
+ OIC_LOG(ERROR, TAG, "u_arraylist_add failed.");
+ close(sockfd);
+ OICFree(svritem);
+ ca_mutex_unlock(g_mutexObjectList);
+ return;
+ }
+ ca_mutex_unlock(g_mutexObjectList);
-exit:
- ca_mutex_lock(g_mutexObjectList);
- close(svritem->u4tcp.fd);
- u_arraylist_remove(caglobals.tcp.svrlist, i);
- ca_mutex_unlock(g_mutexObjectList);
- OICFree(recvBuffer);
- return CA_STATUS_FAILED;
+ CHECKFD(sockfd);
+ }
}
-// TODO: resolving duplication.
-static int CASetNonblocking(int fd)
+static void CAReceiveMessage(int fd)
{
- int fl = fcntl(fd, F_GETFL);
- if (fl == -1)
+ // #1. get remote device information from file descriptor.
+ size_t index = 0;
+ CATCPSessionInfo_t *svritem = CAGetSessionInfoFromFD(fd, &index);
+ if (!svritem)
+ {
+ OIC_LOG(ERROR, TAG, "there is no connection information in list");
+ return;
+ }
+
+ // #2. get already allocated memory size.
+ size_t bufSize = (svritem->totalDataLen == 0) ? TCP_MAX_HEADER_LEN : svritem->totalDataLen;
+ if (!svritem->recvData)
+ {
+ svritem->recvData = (unsigned char *) OICCalloc(1, bufSize);
+ if (!svritem->recvData)
+ {
+ OIC_LOG(ERROR, TAG, "out of memory");
+ CADisconnectTCPSession(svritem, index);
+ return;
+ }
+ }
+
+ // #3. receive data from remote device.
+ ssize_t recvLen = recv(fd, svritem->recvData + svritem->recvDataLen,
+ bufSize - svritem->recvDataLen, 0);
+ if (recvLen <= 0)
{
- OIC_LOG_V(ERROR, TAG, "Failed to get existing flags, Error code: %s",
- strerror(errno));
+ if(EWOULDBLOCK != errno)
+ {
+ OIC_LOG_V(ERROR, TAG, "Recvfrom failed %s", strerror(errno));
+ CADisconnectTCPSession(svritem, index);
+ }
+ return;
}
- else if ((fl & O_NONBLOCK) != O_NONBLOCK)
+ svritem->recvDataLen += recvLen;
+
+ // #4. get actual data length from coap over tcp header.
+ if (!svritem->totalDataLen)
{
- fl = fcntl(fd, F_SETFL, fl | O_NONBLOCK);
- if (fl == -1)
+ coap_transport_type transport = coap_get_tcp_header_type_from_initbyte(
+ ((unsigned char *) svritem->recvData)[0] >> 4);
+
+ size_t headerLen = coap_get_tcp_header_length_for_transport(transport);
+ if (svritem->recvDataLen >= headerLen)
{
- OIC_LOG_V(ERROR, TAG, "Failed to set non-blocking mode, Error code: %s",
- strerror(errno));
+ svritem->totalDataLen = CAGetTotalLengthFromHeader(
+ (unsigned char *) svritem->recvData);
+ bufSize = svritem->totalDataLen;
+ unsigned char *newBuf = OICRealloc(svritem->recvData, bufSize);
+ if (!newBuf)
+ {
+ OIC_LOG(ERROR, TAG, "out of memory");
+ CADisconnectTCPSession(svritem, index);
+ return;
+ }
+ svritem->recvData = newBuf;
}
}
- return fl;
+ // #5. pass the received data information to upper layer.
+ if ((svritem->totalDataLen == svritem->recvDataLen) && g_packetReceivedCallback)
+ {
+ svritem->endpoint.adapter = CA_ADAPTER_TCP;
+ g_packetReceivedCallback(&svritem->endpoint, svritem->recvData, svritem->recvDataLen);
+ OIC_LOG_V(DEBUG, TAG, "total received data len:%d", svritem->recvDataLen);
+
+ // initialize data info to receive next message.
+ OICFree(svritem->recvData);
+ svritem->recvData = NULL;
+ svritem->recvDataLen = 0;
+ svritem->totalDataLen = 0;
+ }
+
+ return;
}
-static int CATCPCreateSocket(int family, CATCPServerInfo_t *TCPServerInfo)
+static int CATCPCreateSocket(int family, CATCPSessionInfo_t *tcpServerInfo)
{
// create tcp socket
int fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
goto exit;
}
- // set non-blocking socket
- if (-1 == CASetNonblocking(fd))
- {
- goto exit;
- }
-
struct sockaddr_storage sa = { .ss_family = family };
- CAConvertNameToAddr(TCPServerInfo->addr, TCPServerInfo->u4tcp.port, &sa);
+ CAConvertNameToAddr(tcpServerInfo->endpoint.addr, tcpServerInfo->endpoint.port, &sa);
socklen_t socklen = sizeof (struct sockaddr_in);
// connect to TCP server
}
else if (EINPROGRESS == errno)
{
+ OIC_LOG(DEBUG, TAG, "EINPROGRESS");
int error = 0;
socklen_t len = sizeof(error);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len) < 0)
return -1;
}
-static void CAAcceptHandler(void *data)
+static CAResult_t CACreateAcceptSocket()
{
- (void)data;
- OIC_LOG(DEBUG, TAG, "IN - CAAcceptHandler");
-
int reuse = 1;
struct sockaddr_in server = { .sin_addr.s_addr = INADDR_ANY,
.sin_family = AF_INET,
goto exit;
}
- struct pollfd acceptServerFD = { .fd = g_acceptServerFD,
- .events = POLLIN };
+ CHECKFD(g_acceptServerFD);
- while (!caglobals.tcp.terminate)
+ return CA_STATUS_OK;
+
+exit:
+ if (g_acceptServerFD >= 0)
+ {
+ close(g_acceptServerFD);
+ g_acceptServerFD = -1;
+ }
+ return CA_STATUS_FAILED;
+}
+
+static void CAInitializePipe()
+{
+ int ret = pipe(caglobals.tcp.shutdownFds);
+ if (-1 != ret)
{
- int pollState = poll(&acceptServerFD, 1, caglobals.tcp.selectTimeout);
- if (pollState < 0)
+ ret = fcntl(caglobals.tcp.shutdownFds[0], F_GETFD);
+ if (-1 != ret)
{
- OIC_LOG_V(FATAL, TAG, "polling error %s", strerror(errno));
- goto exit;
+ ret = fcntl(caglobals.tcp.shutdownFds[0], F_SETFD, ret|FD_CLOEXEC);
}
- else if (!pollState)
+ if (-1 != ret)
{
- continue;
+ ret = fcntl(caglobals.tcp.shutdownFds[1], F_GETFD);
}
-
- if (acceptServerFD.revents & POLLIN)
+ if (-1 != ret)
{
- struct sockaddr_storage clientaddr;
- socklen_t clientlen = sizeof (struct sockaddr_in);
-
- int sockfd = accept(g_acceptServerFD, (struct sockaddr *)&clientaddr, &clientlen);
- if (sockfd != -1)
- {
- CATCPServerInfo_t *svritem = (CATCPServerInfo_t *) OICMalloc(sizeof (*svritem));
- if (!svritem)
- {
- OIC_LOG(ERROR, TAG, "Out of memory");
- close(sockfd);
- return;
- }
-
- // set non-blocking socket
- if (-1 == CASetNonblocking(sockfd))
- {
- close(sockfd);
- OICFree(svritem);
- continue;
- }
- svritem->u4tcp.fd = sockfd;
-
- CAConvertAddrToName((struct sockaddr_storage *)&clientaddr, clientlen,
- (char *) &svritem->addr, &svritem->u4tcp.port);
-
- ca_mutex_lock(g_mutexObjectList);
- bool res = u_arraylist_add(caglobals.tcp.svrlist, svritem);
- if (!res)
- {
- OIC_LOG(ERROR, TAG, "u_arraylist_add failed.");
- close(sockfd);
- OICFree(svritem);
- ca_mutex_unlock(g_mutexObjectList);
- continue;
- }
- ca_mutex_unlock(g_mutexObjectList);
- }
+ ret = fcntl(caglobals.tcp.shutdownFds[1], F_SETFD, ret|FD_CLOEXEC);
}
- }
-
- ca_mutex_lock(g_mutexObjectList);
- // notify the thread
- g_threadCounts--;
- if (!g_threadCounts)
- {
- ca_cond_signal(g_condObjectList);
- }
- ca_mutex_unlock(g_mutexObjectList);
+ if (-1 == ret)
+ {
+ close(caglobals.tcp.shutdownFds[1]);
+ close(caglobals.tcp.shutdownFds[0]);
- OIC_LOG(DEBUG, TAG, "OUT - CAAcceptHandler");
+ caglobals.tcp.shutdownFds[0] = -1;
+ caglobals.tcp.shutdownFds[1] = -1;
-exit:
- if (g_acceptServerFD >= 0)
- {
- close(g_acceptServerFD);
- }
- ca_mutex_lock(g_mutexObjectList);
- g_threadCounts--;
- if (!g_threadCounts)
- {
- ca_cond_signal(g_condObjectList);
+ OIC_LOG_V(ERROR, TAG, "pipe failed: %s", strerror(errno));
+ }
}
- ca_mutex_unlock(g_mutexObjectList);
- return;
}
CAResult_t CATCPStartServer(const ca_thread_pool_t threadPool)
}
CAResult_t res = CATCPCreateMutex();
- if (CA_STATUS_OK != res)
+ if (CA_STATUS_OK == res)
{
- OIC_LOG(ERROR, TAG, "failed to create mutex");
- return res;
+ res = CATCPCreateCond();
}
-
- res = CATCPCreateCond();
if (CA_STATUS_OK != res)
{
- OIC_LOG(ERROR, TAG, "failed to create cond");
+ OIC_LOG(ERROR, TAG, "failed to create mutex/cond");
return res;
}
}
ca_mutex_unlock(g_mutexObjectList);
- caglobals.tcp.terminate = false;
-
- res = ca_thread_pool_add_task(threadPool, CAAcceptHandler, NULL);
+ res = CACreateAcceptSocket();
if (CA_STATUS_OK != res)
{
- OIC_LOG(ERROR, TAG, "thread_pool_add_task failed");
+ OIC_LOG(ERROR, TAG, "failed to create accept socket");
return res;
}
- OIC_LOG(DEBUG, TAG, "CAAcceptHandler thread started successfully.");
+ // create pipe for fast shutdown
+ CAInitializePipe();
+ CHECKFD(caglobals.tcp.shutdownFds[0]);
+ CHECKFD(caglobals.tcp.shutdownFds[1]);
+
+ caglobals.tcp.terminate = false;
res = ca_thread_pool_add_task(threadPool, CAReceiveHandler, NULL);
if (CA_STATUS_OK != res)
{
OIC_LOG(DEBUG, TAG, "CAReceiveHandler thread started successfully.");
caglobals.tcp.started = true;
-
- g_threadCounts = CA_TCP_DEFAULT_THREAD_COUNTS;
-
return CA_STATUS_OK;
}
void CATCPStopServer()
{
- OIC_LOG(DEBUG, TAG, "IN");
-
// mutex lock
ca_mutex_lock(g_mutexObjectList);
caglobals.tcp.terminate = true;
caglobals.tcp.started = false;
+ if (caglobals.tcp.shutdownFds[1] != -1)
+ {
+ close(caglobals.tcp.shutdownFds[1]);
+ // receive thread will stop immediately
+ }
+
ca_cond_wait(g_condObjectList, g_mutexObjectList);
// mutex unlock
CATCPDisconnectAll();
CATCPDestroyMutex();
CATCPDestroyCond();
-
- OIC_LOG(DEBUG, TAG, "OUT");
}
void CATCPSetPacketReceiveCallback(CATCPPacketReceivedCallback callback)
{
- OIC_LOG(DEBUG, TAG, "IN");
-
g_packetReceivedCallback = callback;
-
- OIC_LOG(DEBUG, TAG, "OUT");
}
static size_t CACheckPayloadLength(const void *data, size_t dlen)
{
+ VERIFY_NON_NULL_RET(data, TAG, "data", -1);
+
coap_transport_type transport = coap_get_tcp_header_type_from_initbyte(
((unsigned char *)data)[0] >> 4);
const void *data, size_t dlen)
{
// #1. get TCP Server object from list
- uint32_t index = 0;
- CATCPServerInfo_t *svritem = CAGetTCPServerInfoFromList(endpoint->addr, endpoint->port,
- &index);
+ size_t index = 0;
+ CATCPSessionInfo_t *svritem = CAGetTCPSessionInfoFromEndpoint(endpoint, &index);
if (!svritem)
{
// if there is no connection info, connect to TCP Server
- svritem = CAConnectToTCPServer(endpoint);
+ svritem = CAConnectTCPSession(endpoint);
if (!svritem)
{
OIC_LOG(ERROR, TAG, "Failed to create TCP server object");
if (!payloadLen)
{
OIC_LOG(DEBUG, TAG, "payload length is zero, disconnect from remote device");
- CADisconnectFromTCPServer(endpoint);
+ CADisconnectTCPSession(svritem, index);
return;
}
// #3. check connection state
- if (svritem->u4tcp.fd < 0)
+ if (svritem->fd < 0)
{
// if file descriptor value is wrong, remove TCP Server info from list
OIC_LOG(ERROR, TAG, "Failed to connect to TCP server");
- CADisconnectFromTCPServer(endpoint);
+ CADisconnectTCPSession(svritem, index);
g_TCPErrorHandler(endpoint, data, dlen, CA_SEND_FAILED);
return;
}
ssize_t remainLen = dlen;
do
{
- ssize_t len = send(svritem->u4tcp.fd, data, remainLen, 0);
+ ssize_t len = send(svritem->fd, data, remainLen, 0);
if (-1 == len)
{
if (EWOULDBLOCK != errno)
CAResult_t CAGetTCPInterfaceInformation(CAEndpoint_t **info, uint32_t *size)
{
- OIC_LOG(DEBUG, TAG, "IN");
-
VERIFY_NON_NULL(info, TAG, "info is NULL");
VERIFY_NON_NULL(size, TAG, "size is NULL");
return CA_NOT_SUPPORTED;
}
-CATCPServerInfo_t *CAConnectToTCPServer(const CAEndpoint_t *TCPServerInfo)
+CATCPSessionInfo_t *CAConnectTCPSession(const CAEndpoint_t *endpoint)
{
- VERIFY_NON_NULL_RET(TCPServerInfo, TAG, "TCPServerInfo is NULL", NULL);
+ VERIFY_NON_NULL_RET(endpoint, TAG, "endpoint is NULL", NULL);
// #1. create TCP server object
- CATCPServerInfo_t *svritem = (CATCPServerInfo_t *) OICMalloc(sizeof (*svritem));
+ CATCPSessionInfo_t *svritem = (CATCPSessionInfo_t *) OICCalloc(1, sizeof (*svritem));
if (!svritem)
{
OIC_LOG(ERROR, TAG, "Out of memory");
return NULL;
}
- memcpy(svritem->addr, TCPServerInfo->addr, sizeof(svritem->addr));
- svritem->u4tcp.port = TCPServerInfo->port;
+ memcpy(svritem->endpoint.addr, endpoint->addr, sizeof(svritem->endpoint.addr));
+ svritem->endpoint.port = endpoint->port;
// #2. create the socket and connect to TCP server
if (caglobals.tcp.ipv4tcpenabled)
{
- svritem->u4tcp.fd = CATCPCreateSocket(AF_INET, svritem);
- if (-1 == svritem->u4tcp.fd)
+ int fd = CATCPCreateSocket(AF_INET, svritem);
+ if (-1 == fd)
{
OICFree(svritem);
return NULL;
}
- }
- // #3. add TCP connection info to list
- ca_mutex_lock(g_mutexObjectList);
- if (caglobals.tcp.svrlist)
- {
- bool res = u_arraylist_add(caglobals.tcp.svrlist, svritem);
- if (!res)
+ // #3. add TCP connection info to list
+ svritem->fd = fd;
+ ca_mutex_lock(g_mutexObjectList);
+ if (caglobals.tcp.svrlist)
{
- OIC_LOG(ERROR, TAG, "u_arraylist_add failed.");
- close(svritem->u4tcp.fd);
- OICFree(svritem);
- ca_mutex_unlock(g_mutexObjectList);
- return NULL;
+ bool res = u_arraylist_add(caglobals.tcp.svrlist, svritem);
+ if (!res)
+ {
+ OIC_LOG(ERROR, TAG, "u_arraylist_add failed.");
+ close(svritem->fd);
+ OICFree(svritem);
+ ca_mutex_unlock(g_mutexObjectList);
+ return NULL;
+ }
}
+ ca_mutex_unlock(g_mutexObjectList);
+
+ CHECKFD(fd);
}
- ca_mutex_unlock(g_mutexObjectList);
return svritem;
}
-CAResult_t CADisconnectFromTCPServer(const CAEndpoint_t *TCPServerInfo)
+CAResult_t CADisconnectTCPSession(CATCPSessionInfo_t *svritem, size_t index)
{
- VERIFY_NON_NULL(TCPServerInfo, TAG, "TCP server info is NULL");
+ VERIFY_NON_NULL(svritem, TAG, "svritem is NULL");
- // #1. get server info
- uint32_t index = 0;
ca_mutex_lock(g_mutexObjectList);
- CATCPServerInfo_t *svritem = CAGetTCPServerInfoFromList(TCPServerInfo->addr,
- TCPServerInfo->port,
- &index);
- if (!svritem)
- {
- OIC_LOG(ERROR, TAG, "there is no connection info");
- ca_mutex_unlock(g_mutexObjectList);
- return CA_STATUS_FAILED;
- }
- // #2. close the socket and remove TCP connection info in list
- if (svritem->u4tcp.fd >= 0)
+ // close the socket and remove TCP connection info in list
+ if (svritem->fd >= 0)
{
- close(svritem->u4tcp.fd);
+ close(svritem->fd);
}
u_arraylist_remove(caglobals.tcp.svrlist, index);
+ OICFree(svritem->recvData);
+ OICFree(svritem);
ca_mutex_unlock(g_mutexObjectList);
return CA_STATUS_OK;
}
-CATCPServerInfo_t *CAGetTCPServerInfoFromList(const char *addr, const uint16_t port,
- uint32_t *index)
+void CATCPDisconnectAll()
+{
+ ca_mutex_lock(g_mutexObjectList);
+ uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+
+ CATCPSessionInfo_t *svritem = NULL;
+ for (size_t i = 0; i < length; i++)
+ {
+ svritem = (CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
+ if (svritem && svritem->fd >= 0)
+ {
+ shutdown(svritem->fd, SHUT_RDWR);
+ close(svritem->fd);
+ OICFree(svritem->recvData);
+ }
+ }
+ u_arraylist_destroy(caglobals.tcp.svrlist);
+ ca_mutex_unlock(g_mutexObjectList);
+}
+
+CATCPSessionInfo_t *CAGetTCPSessionInfoFromEndpoint(const CAEndpoint_t *endpoint,
+ size_t *index)
{
- VERIFY_NON_NULL_RET(addr, TAG, "addr is NULL", NULL);
+ VERIFY_NON_NULL_RET(endpoint, TAG, "endpoint is NULL", NULL);
VERIFY_NON_NULL_RET(index, TAG, "index is NULL", NULL);
// get connection info from list
uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
-
for (size_t i = 0; i < length; i++)
{
- CATCPServerInfo_t *svritem = (CATCPServerInfo_t *) u_arraylist_get(
+ CATCPSessionInfo_t *svritem = (CATCPSessionInfo_t *) u_arraylist_get(
caglobals.tcp.svrlist, i);
if (!svritem)
{
continue;
}
- if (!strncmp(svritem->addr, addr, sizeof(svritem->addr))
- && (svritem->u4tcp.port == port))
+ if (!strncmp(svritem->endpoint.addr, endpoint->addr, sizeof(svritem->endpoint.addr))
+ && (svritem->endpoint.port == endpoint->port))
+ {
+ *index = i;
+ return svritem;
+ }
+ }
+
+ return NULL;
+}
+
+CATCPSessionInfo_t *CAGetSessionInfoFromFD(int fd, size_t *index)
+{
+ ca_mutex_lock(g_mutexObjectList);
+
+ // check from the last item.
+ CATCPSessionInfo_t *svritem = NULL;
+ uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+ for (size_t i = 0; i < length; i++)
+ {
+ svritem = (CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
+
+ if (svritem && svritem->fd == fd)
{
*index = i;
+ ca_mutex_unlock(g_mutexObjectList);
return svritem;
}
}
+ ca_mutex_unlock(g_mutexObjectList);
+
return NULL;
}
+size_t CAGetTotalLengthFromHeader(const unsigned char *recvBuffer)
+{
+ OIC_LOG(DEBUG, TAG, "IN - CAGetTotalLengthFromHeader");
+
+ coap_transport_type transport = coap_get_tcp_header_type_from_initbyte(
+ ((unsigned char *)recvBuffer)[0] >> 4);
+ size_t optPaylaodLen = coap_get_length_from_header((unsigned char *)recvBuffer,
+ transport);
+ size_t headerLen = coap_get_tcp_header_length((unsigned char *)recvBuffer);
+
+ OIC_LOG_V(DEBUG, TAG, "option/paylaod length [%d]", optPaylaodLen);
+ OIC_LOG_V(DEBUG, TAG, "header length [%d]", headerLen);
+ OIC_LOG_V(DEBUG, TAG, "total data length [%d]", headerLen + optPaylaodLen);
+
+ OIC_LOG(DEBUG, TAG, "OUT - CAGetTotalLengthFromHeader");
+ return headerLen + optPaylaodLen;
+}
+
void CATCPSetErrorHandler(CATCPErrorHandleCallback errorHandleCallback)
{
g_TCPErrorHandler = errorHandleCallback;