#include <sys/types.h>
#include <sys/socket.h>
-#include <sys/select.h>
#include <sys/ioctl.h>
#ifdef __TIZENRT__
#include <tinyara/config.h>
#include "caadapterutils.h"
#include "octhread.h"
#include "oic_malloc.h"
+#include "oic_string.h"
#ifdef __WITH_TLS__
#include "ca_adapter_net_ssl.h"
static ca_thread_pool_t g_threadPool = NULL;
/**
+ * Thread task id.
+ */
+static uint32_t g_taskId = 0;
+
+/**
* Mutex to synchronize device object list.
*/
static oc_mutex g_mutexObjectList = NULL;
static CASocketFd_t CACreateAcceptSocket(int family, CASocket_t *sock);
static void CAAcceptConnection(CATransportFlags_t flag, CASocket_t *sock);
static void CAFindReadyMessage();
-static void CASelectReturned(fd_set *readFds);
+static void CAPollReturned(struct pollfd *readFds, size_t size);
static void CAReceiveMessage(int fd);
static void CAReceiveHandler(void *data);
static CAResult_t CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem);
static void CATCPInitializeSocket();
static CATCPSessionInfo_t *CAGetSessionInfoFromFDAsOwner(int fd, size_t *index);
-#define CHECKFD(FD) \
- if (FD > caglobals.tcp.maxfd) \
- caglobals.tcp.maxfd = FD;
+#if defined(__TIZEN__)
+static char g_cloudproxyUri[CA_MAX_URI_LENGTH];
+
+CAResult_t CASetCloudAddressForProxy(const char *uri)
+{
+ if (uri == NULL)
+ memset(g_cloudproxyUri, '\0', sizeof (g_cloudproxyUri));
+ else
+ OICStrcpy(g_cloudproxyUri, sizeof (g_cloudproxyUri), uri);
+ return CA_STATUS_OK;
+}
+
+const char *CAGetCloudAddressForProxy()
+{
+ if (g_cloudproxyUri[0] == '\0')
+ return NULL;
+ return g_cloudproxyUri;
+}
+#endif
+
+#define MAX_TCP_SOCK_COUNT 4
#define CLOSE_SOCKET(TYPE) \
if (caglobals.tcp.TYPE.fd != OC_INVALID_SOCKET) \
caglobals.tcp.TYPE.fd = OC_INVALID_SOCKET; \
}
-#define CA_FD_SET(TYPE, FDS) \
- if (caglobals.tcp.TYPE.fd != OC_INVALID_SOCKET) \
- { \
- FD_SET(caglobals.tcp.TYPE.fd, FDS); \
- }
+#define CA_FD_SET(TYPE, FDS, COUNT) \
+ FDS[COUNT].fd = caglobals.tcp.TYPE.fd; \
+ FDS[COUNT].events = POLLIN;
void CATCPDestroyMutex()
{
static void CAFindReadyMessage()
{
- fd_set readFds;
- struct timeval timeout = { .tv_sec = caglobals.tcp.selectTimeout };
-
- FD_ZERO(&readFds);
- CA_FD_SET(ipv4, &readFds);
- CA_FD_SET(ipv4s, &readFds);
- CA_FD_SET(ipv6, &readFds);
- CA_FD_SET(ipv6s, &readFds);
-#ifndef __TIZENRT__
- if (OC_INVALID_SOCKET != caglobals.tcp.shutdownFds[0])
- {
- FD_SET(caglobals.tcp.shutdownFds[0], &readFds);
- }
-#endif
- if (OC_INVALID_SOCKET != caglobals.tcp.connectionFds[0])
+ int timeout = (caglobals.tcp.selectTimeout * 1000);
+ size_t counter = 0;
+
+ oc_mutex_lock(g_mutexObjectList);
+ uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+
+ // Consider 4 tcp sockets(ipv4, ipv4s, ipv6, ipv6s) + 1 connection fd + all sockets in svrlist
+ struct pollfd *readFds = (struct pollfd *)OICCalloc(MAX_TCP_SOCK_COUNT + 1 + length, sizeof(struct pollfd));
+ if (NULL == readFds)
{
- FD_SET(caglobals.tcp.connectionFds[0], &readFds);
+ OIC_LOG_V(ERROR, TAG, "Failed to allocate memory!");
+ oc_mutex_unlock(g_mutexObjectList);
+ return;
}
- uint32_t length = u_arraylist_length(caglobals.tcp.svrlist);
+ // 4 tcp sockets
+ CA_FD_SET(ipv4, readFds, counter);
+ counter++;
+ CA_FD_SET(ipv4s, readFds, counter);
+ counter++;
+ CA_FD_SET(ipv6, readFds, counter);
+ counter++;
+ CA_FD_SET(ipv6s, readFds, counter);
+ counter++;
+
+ // 1 connection fd
+ readFds[counter].fd = caglobals.tcp.connectionFds[0];
+ readFds[counter].events = POLLIN;
+ counter++;
+
+ // All sockets in svrlist
for (size_t i = 0; i < length; i++)
{
CATCPSessionInfo_t *svritem =
(CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
if (svritem && 0 <= svritem->fd && CONNECTED == svritem->state)
{
- FD_SET(svritem->fd, &readFds);
+ readFds[counter].fd = svritem->fd;
+ readFds[counter].events = POLLIN;
+ counter++;
}
}
+ oc_mutex_unlock(g_mutexObjectList);
- int ret = select(caglobals.tcp.maxfd + 1, &readFds, NULL, NULL, &timeout);
+ int ret = poll(readFds, counter, timeout);
oc_mutex_lock(g_mutexObjectList);
if (caglobals.tcp.terminate)
{
- oc_mutex_unlock(g_mutexObjectList);
OIC_LOG_V(INFO, TAG, "Packet receiver Stop request received.");
+ oc_mutex_unlock(g_mutexObjectList);
+ OICFree(readFds);
return;
}
oc_mutex_unlock(g_mutexObjectList);
- if (0 >= ret)
+
+ if (ret > 0)
{
- if (0 > ret)
- {
- OIC_LOG_V(FATAL, TAG, "select error %s", strerror(errno));
- }
- return;
+ CAPollReturned(readFds, counter);
+ }
+ else if (ret < 0)
+ {
+ OIC_LOG_V(FATAL, TAG, "poll error %s", strerror(errno));
}
- CASelectReturned(&readFds);
+ OICFree(readFds);
}
-static void CASelectReturned(fd_set *readFds)
+static void CAPollReturned(struct pollfd *readFds, size_t size)
{
VERIFY_NON_NULL_VOID(readFds, TAG, "readFds is NULL");
- if (caglobals.tcp.ipv4.fd != -1 && FD_ISSET(caglobals.tcp.ipv4.fd, readFds))
+ if (caglobals.tcp.ipv4.fd != -1 && readFds[0].revents == POLLIN)
{
CAAcceptConnection(CA_IPV4, &caglobals.tcp.ipv4);
return;
}
- else if (caglobals.tcp.ipv4s.fd != -1 && FD_ISSET(caglobals.tcp.ipv4s.fd, readFds))
+ else if (caglobals.tcp.ipv4s.fd != -1 && readFds[1].revents == POLLIN)
{
CAAcceptConnection(CA_IPV4 | CA_SECURE, &caglobals.tcp.ipv4s);
return;
}
- else if (caglobals.tcp.ipv6.fd != -1 && FD_ISSET(caglobals.tcp.ipv6.fd, readFds))
+ else if (caglobals.tcp.ipv6.fd != -1 && readFds[2].revents == POLLIN)
{
CAAcceptConnection(CA_IPV6, &caglobals.tcp.ipv6);
return;
}
- else if (caglobals.tcp.ipv6s.fd != -1 && FD_ISSET(caglobals.tcp.ipv6s.fd, readFds))
+ else if (caglobals.tcp.ipv6s.fd != -1 && readFds[3].revents == POLLIN)
{
CAAcceptConnection(CA_IPV6 | CA_SECURE, &caglobals.tcp.ipv6s);
return;
}
- else if (-1 != caglobals.tcp.connectionFds[0] &&
- FD_ISSET(caglobals.tcp.connectionFds[0], readFds))
+ else if (-1 != caglobals.tcp.connectionFds[0] && readFds[4].revents != 0)
{
- // new connection was created from remote device.
- // exit the function to update read file descriptor.
- char buf[MAX_ADDR_STR_SIZE_CA] = {0};
- ssize_t len = read(caglobals.tcp.connectionFds[0], buf, sizeof (buf));
- if (-1 == len)
- {
+ // new connection was created from remote device.
+ // exit the function to update read file descriptor.
+ char buf[MAX_ADDR_STR_SIZE_CA] = {0};
+ ssize_t len = read(caglobals.tcp.connectionFds[0], buf, sizeof (buf));
+ if (-1 == len)
+ {
+ return;
+ }
+ OIC_LOG_V(DEBUG, TAG, "Received new connection event with [%s]", buf);
return;
- }
- OIC_LOG_V(DEBUG, TAG, "Received new connection event with [%s]", buf);
- return;
}
else
{
(CATCPSessionInfo_t *) u_arraylist_get(caglobals.tcp.svrlist, i);
if (svritem && svritem->fd >= 0)
{
- if (FD_ISSET(svritem->fd, readFds))
+ size_t j = 0;
+ while (j < size)
+ {
+ if (svritem->fd == readFds[j].fd)
+ {
+ break;
+ }
+ j++;
+ }
+
+ if (j < size && readFds[j].revents != 0)
{
readFDList[readFDListSize++] = svritem->fd;
}
}
oc_mutex_unlock(g_mutexObjectList);
- CHECKFD(sockfd);
-
// pass the connection information to CA Common Layer.
if (g_connectionCallback)
{
unsigned char *inBuffer = *data;
size_t inLen = *dataLength;
- OIC_LOG_V(DEBUG, TAG, "before-datalength : %u", *dataLength);
+ OIC_LOG_V(DEBUG, TAG, "before-datalength : %zd", *dataLength);
if (NULL == svritem->data && inLen > 0)
{
*data = inBuffer;
*dataLength = inLen;
- OIC_LOG_V(DEBUG, TAG, "after-datalength : %u", *dataLength);
+ OIC_LOG_V(DEBUG, TAG, "after-datalength : %zd", *dataLength);
OIC_LOG_V(DEBUG, TAG, "Out %s", __func__);
return CA_STATUS_OK;
}
//[3][4] bytes in tls header are tls payload length
tlsLength = TLS_HEADER_SIZE +
(size_t)((svritem->tlsdata[3] << 8) | svritem->tlsdata[4]);
- OIC_LOG_V(DEBUG, TAG, "total tls length = %u", tlsLength);
+ OIC_LOG_V(DEBUG, TAG, "total tls length = %zd", tlsLength);
if (tlsLength > TLS_DATA_MAX_SIZE)
{
OIC_LOG_V(ERROR, TAG, "total tls length is too big (buffer size : %u)",
else
{
svritem->tlsLen += len;
- OIC_LOG_V(DEBUG, TAG, "nb_read : %u bytes , recv() : %d bytes, svritem->tlsLen : %u bytes",
+ OIC_LOG_V(DEBUG, TAG, "nb_read : %zd bytes , recv() : %d bytes, svritem->tlsLen : %zd bytes",
nbRead, len, svritem->tlsLen);
if (tlsLength > 0 && tlsLength == svritem->tlsLen)
{
return CA_STATUS_OK;
}
+#if defined(__TIZEN__)
+static int CAGetHTTPStatusCode(char * response) {
+ char *resp, *code_plus, *ptrSave;
+ int ret = -1;
+
+ resp = strdup(response);
+ strtok_r(resp, " ", &ptrSave); /* skip HTTP version */
+ code_plus = strtok_r(NULL, " ", &ptrSave);
+
+ ret = code_plus ? atoi(code_plus) : -1;
+ free(resp);
+ return ret;
+}
+#endif
+
static CAResult_t CATCPCreateSocket(int family, CATCPSessionInfo_t *svritem)
{
VERIFY_NON_NULL(svritem, TAG, "svritem is NULL");
OIC_LOG(INFO, TAG, "connect socket success");
svritem->state = CONNECTED;
- CHECKFD(svritem->fd);
ssize_t len = CAWakeUpForReadFdsUpdate(svritem->sep.endpoint.addr);
if (-1 == len)
{
OIC_LOG(ERROR, TAG, "wakeup receive thread failed");
return CA_SOCKET_OPERATION_FAILED;
}
+
+#if defined(__TIZEN__)
+ // #5. Send HTTP CONNECT to proxy if proxy
+
+ const char *cloud_address = CAGetCloudAddressForProxy();
+ OIC_LOG_V(INFO, TAG, "Proxy : '%s'", cloud_address ? cloud_address : "(nil)");
+
+ if(cloud_address && *cloud_address)
+ {
+ char message[4096];
+ int len = sprintf(message,
+ "CONNECT %s HTTP/1.1\r\n"
+ "Host: %s\r\n\r\n", cloud_address, cloud_address
+ );
+
+ ssize_t l = send(fd, message, len, 0);
+ if(l != len)
+ {
+ OIC_LOG_V(ERROR, TAG, "failed to send HTTP CONNECT data (expected %d bytes, ret %zd)", len, l);
+ close(fd);
+ svritem->fd = -1;
+ return CA_SOCKET_OPERATION_FAILED;
+ }
+
+ // maybe this should be called in other thread, it causes bottleneck.
+ OIC_LOG_V(INFO, TAG, "Message sent is : '%s'\n", message);
+
+ *message = '\0';
+ OIC_LOG_V(INFO, TAG, "Receiving response to CONNECT from proxy...");
+
+ l = recv(fd, message, 4096, 0);
+
+ OIC_LOG_V(INFO, TAG, "Received data : '%s'", message);
+ OIC_LOG_V(INFO, TAG, "Received len = %zd", l);
+
+ int status_code = CAGetHTTPStatusCode(message);
+
+ OIC_LOG_V(INFO, TAG, "HTTP status_code : %d", status_code);
+ if(status_code < 200 || status_code > 299)
+ {
+ OIC_LOG_V(ERROR, TAG, "Error, Wrong status code: %d", status_code);
+ close(fd);
+ svritem->fd = -1;
+ return CA_SOCKET_OPERATION_FAILED;
+ }
+ }
+#endif
+
return CA_STATUS_OK;
}
caglobals.tcp.NAME.port = 0; \
caglobals.tcp.NAME.fd = CACreateAcceptSocket(FAMILY, &caglobals.tcp.NAME); \
} \
- CHECKFD(caglobals.tcp.NAME.fd);
void CATCPInitializeSocket()
{
#ifndef __TIZENRT__
// create pipe for fast shutdown
CAInitializePipe(caglobals.tcp.shutdownFds);
- CHECKFD(caglobals.tcp.shutdownFds[0]);
- CHECKFD(caglobals.tcp.shutdownFds[1]);
#endif
// create pipe for connection event
CAInitializePipe(caglobals.tcp.connectionFds);
- CHECKFD(caglobals.tcp.connectionFds[0]);
- CHECKFD(caglobals.tcp.connectionFds[1]);
CAResult_t res = CA_STATUS_OK;
#ifndef __TIZENRT__
- res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, NULL);
+ res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, &g_taskId);
#else
res = ca_thread_pool_add_task(g_threadPool, CAReceiveHandler, NULL, NULL,
"IoT_TCPReceive", CONFIG_IOTIVITY_TCPRECEIVE_PTHREAD_STACKSIZE);
#endif
oc_mutex_unlock(g_mutexObjectList);
+#ifndef __TIZENRT__
+ ca_thread_pool_remove_task(g_threadPool, g_taskId);
+#endif
+
CATCPDisconnectAll();
sleep(1);
-
OIC_LOG(INFO, TAG, "Adapter terminated successfully");
}