* @bug No known bugs except for NYI items
*/
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/poll.h>
+
#include "nnstreamer-edge-common.h"
#include "nnstreamer-edge-internal.h"
int port;
int8_t running;
pthread_t msg_thread;
- GSocket *socket;
+ int sockfd;
} nns_edge_conn_s;
/**
nns_edge_conn_s *conn;
} nns_edge_thread_data_s;
+/**
+ * @brief Set socket option.
+ * @todo handle connection type (TCP/UDP).
+ */
+static void
+_set_socket_option (int fd)
+{
+ int nodelay = 1;
+
+ /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
+ if (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof (int)) < 0)
+ nns_edge_logw ("Failed to set TCP delay option.");
+}
+
/**
* @brief Send data to connected socket.
*/
{
size_t sent = 0;
ssize_t rret;
- GError *err = NULL;
while (sent < size) {
- rret = g_socket_send (conn->socket, (char *) data + sent, size - sent,
- NULL, &err);
-
- if (rret == 0) {
- nns_edge_loge ("Connection closed.");
- return false;
- }
+ rret = send (conn->sockfd, (char *) data + sent, size - sent, 0);
- if (rret < 0) {
- nns_edge_loge ("Error while sending data (%s).", err->message);
- g_clear_error (&err);
+ if (rret <= 0) {
+ nns_edge_loge ("Failed to send raw data.");
return false;
}
{
size_t received = 0;
ssize_t rret;
- GError *err = NULL;
while (received < size) {
- rret = g_socket_receive (conn->socket, (char *) data + received,
- size - received, NULL, &err);
-
- if (rret == 0) {
- nns_edge_loge ("Connection closed.");
- return false;
- }
+ rret = recv (conn->sockfd, (char *) data + received, size - received, 0);
- if (rret < 0) {
- nns_edge_loge ("Failed to read from socket (%s).", err->message);
- g_clear_error (&err);
+ if (rret <= 0) {
+ nns_edge_loge ("Failed to receive raw data.");
return false;
}
static bool
_nns_edge_check_connection (nns_edge_conn_s * conn)
{
- GIOCondition condition;
+ struct pollfd poll_fd;
+ int n;
- if (!conn || !conn->socket || g_socket_is_closed (conn->socket))
+ if (!conn || conn->sockfd < 0)
return false;
- condition = g_socket_condition_check (conn->socket,
- G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+ poll_fd.fd = conn->sockfd;
+ poll_fd.events = POLLIN | POLLOUT | POLLPRI | POLLERR | POLLHUP;
+ poll_fd.revents = 0;
- if (!condition || (condition & (G_IO_ERR | G_IO_HUP))) {
+ n = poll (&poll_fd, 1, 0);
+ if (n <= 0 || poll_fd.revents & (POLLERR | POLLHUP)) {
nns_edge_logw ("Socket is not available, possibly closed.");
return false;
}
conn->msg_thread = 0;
}
- if (conn->socket) {
+ if (conn->sockfd >= 0) {
nns_edge_cmd_s cmd;
/* Send error before closing the socket. */
_nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0);
_nns_edge_cmd_send (conn, &cmd);
- /**
- * Close and release the socket.
- * Using GSocket, if its last reference is dropped, it will close socket automatically.
- */
- g_clear_object (&conn->socket);
+ if (close (conn->sockfd) < 0)
+ nns_edge_logw ("Failed to close socket.");
+ conn->sockfd = -1;
}
SAFE_FREE (conn->host);
}
}
-/**
- * @brief Get socket address
- */
-static bool
-_nns_edge_get_saddr (const char *host, const int port, GSocketAddress ** saddr)
-{
- GError *err = NULL;
- GInetAddress *addr;
-
- /* look up name if we need to */
- addr = g_inet_address_new_from_string (host);
- if (!addr) {
- GList *results;
- GResolver *resolver;
- resolver = g_resolver_get_default ();
- results = g_resolver_lookup_by_name (resolver, host, NULL, &err);
- if (!results) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
- nns_edge_loge ("Failed to resolve host, name resolver is cancelled.");
- } else {
- nns_edge_loge ("Failed to resolve host '%s': %s", host, err->message);
- }
- g_clear_error (&err);
- g_object_unref (resolver);
- return false;
- }
- /** @todo Try with the second address if the first fails */
- addr = G_INET_ADDRESS (g_object_ref (results->data));
- g_resolver_free_addresses (results);
- g_object_unref (resolver);
- }
-
- *saddr = g_inet_socket_address_new (addr, port);
- g_object_unref (addr);
-
- return true;
-}
-
/**
* @brief Connect to requested socket.
*/
static bool
_nns_edge_connect_socket (nns_edge_conn_s * conn)
{
- GError *err = NULL;
- GSocketAddress *saddr = NULL;
- bool ret = false;
+ struct sockaddr_in saddr = { 0 };
+ socklen_t saddr_len = sizeof (struct sockaddr_in);
- if (!_nns_edge_get_saddr (conn->host, conn->port, &saddr)) {
- nns_edge_loge ("Failed to get socket address");
- return ret;
- }
+ /**
+ * @todo handle protocol
+ * 1. support edge connection type (TCP/UDP)
+ * 2. ipv4 and ipv6
+ */
+ saddr.sin_family = AF_INET;
+ saddr.sin_port = htons (conn->port);
- /* create sending client socket */
- conn->socket =
- g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
- G_SOCKET_PROTOCOL_TCP, &err);
+ if ((saddr.sin_addr.s_addr = inet_addr (conn->host)) == -1) {
+ struct hostent *ent = gethostbyname (conn->host);
+ if (!ent) {
+ nns_edge_loge ("Failed to connect socket, invalid host %s.", conn->host);
+ return false;
+ }
- if (!conn->socket) {
- nns_edge_loge ("Failed to create new socket.");
- goto done;
+ memmove (&saddr.sin_addr, ent->h_addr, ent->h_length);
}
- /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
- if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
- nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
- goto done;
+ conn->sockfd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (conn->sockfd < 0) {
+ nns_edge_loge ("Failed to create new socket.");
+ return false;
}
- if (!g_socket_connect (conn->socket, saddr, NULL, &err)) {
- if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
- nns_edge_logd ("Connection cancelled (%s:%d).", conn->host, conn->port);
- } else {
- nns_edge_loge ("Failed to connect host %s:%d.", conn->host, conn->port);
- }
- goto done;
- }
+ _set_socket_option (conn->sockfd);
- /* now connected to the requested socket */
- ret = true;
+ if (connect (conn->sockfd, (struct sockaddr *) &saddr, saddr_len) < 0) {
+ nns_edge_loge ("Failed to connect host %s:%d.", conn->host, conn->port);
+ return false;
+ }
-done:
- g_object_unref (saddr);
- g_clear_error (&err);
- return ret;
+ return true;
}
/**
conn->host = nns_edge_strdup (host);
conn->port = port;
+ conn->sockfd = -1;
if (!_nns_edge_connect_socket (conn)) {
goto error;
}
/**
- * @brief Callback for socket listener, accept socket and create message thread.
+ * @brief Accept socket and create message thread in socket listener thread.
*/
static void
-_nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result,
- gpointer user_data)
+_nns_edge_accept_socket (nns_edge_handle_s * eh)
{
- GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
- GSocket *socket = NULL;
- GError *err = NULL;
- nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data;
bool done = false;
nns_edge_conn_s *conn;
nns_edge_conn_data_s *conn_data;
char *dest_host = NULL;
int dest_port, ret;
- socket =
- g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
- &err);
-
- if (!socket) {
- nns_edge_loge ("Failed to get socket: %s", err->message);
- g_clear_error (&err);
- return;
- }
- g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC);
-
conn = (nns_edge_conn_s *) calloc (1, sizeof (nns_edge_conn_s));
if (!conn) {
nns_edge_loge ("Failed to allocate edge connection.");
goto error;
}
- conn->socket = socket;
-
- /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
- if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
- nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
- g_clear_error (&err);
+ conn->sockfd = accept (eh->listener_fd, NULL, NULL);
+ if (conn->sockfd < 0) {
+ nns_edge_loge ("Failed to accept socket.");
goto error;
}
+ _set_socket_option (conn->sockfd);
+
if (eh->flags & NNS_EDGE_FLAG_SERVER)
client_id = g_get_monotonic_time ();
else
if (!done)
_nns_edge_close_connection (conn);
- if (eh->listener)
- g_socket_listener_accept_socket_async (eh->listener, NULL,
- (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
-
SAFE_FREE (dest_host);
}
+/**
+ * @brief Socket listener thread.
+ */
+static void *
+_nns_edge_socket_listener_thread (void *thread_data)
+{
+ nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
+
+ eh->listening = true;
+ while (eh->listening) {
+ struct pollfd poll_fd;
+
+ poll_fd.fd = eh->listener_fd;
+ poll_fd.events = POLLIN | POLLHUP | POLLERR;
+ poll_fd.revents = 0;
+
+ /* 10 milliseconds */
+ if (poll (&poll_fd, 1, 10) > 0) {
+ if (!eh->listening)
+ break;
+
+ if (poll_fd.revents & (POLLERR | POLLHUP)) {
+ nns_edge_loge ("Invalid state, possibly socket is closed in listener.");
+ break;
+ }
+
+ if (poll_fd.revents & POLLIN)
+ _nns_edge_accept_socket (eh);
+ }
+ }
+
+ return NULL;
+}
+
+/**
+ * @brief Create socket listener.
+ * @note This function should be called with handle lock.
+ */
+static bool
+_nns_edge_create_socket_listener (nns_edge_handle_s * eh)
+{
+ bool done = false;
+ struct sockaddr_in saddr = { 0 };
+ socklen_t saddr_len = sizeof (struct sockaddr_in);
+ pthread_attr_t attr;
+ int tid;
+
+ eh->listener_fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (eh->listener_fd < 0) {
+ nns_edge_loge ("Failed to create listener socket.");
+ return false;
+ }
+
+ saddr.sin_family = AF_INET;
+ saddr.sin_addr.s_addr = htonl (INADDR_ANY);
+ saddr.sin_port = htons (eh->port);
+
+ if (bind (eh->listener_fd, (struct sockaddr *) &saddr, saddr_len) < 0 ||
+ listen (eh->listener_fd, N_BACKLOG) < 0) {
+ nns_edge_loge ("Failed to create listener, cannot bind socket.");
+ goto error;
+ }
+
+ pthread_attr_init (&attr);
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
+ tid = pthread_create (&eh->listener_thread, &attr,
+ _nns_edge_socket_listener_thread, eh);
+ pthread_attr_destroy (&attr);
+
+ if (tid < 0) {
+ nns_edge_loge ("Failed to create listener thread.");
+ eh->listening = false;
+ eh->listener_thread = 0;
+ goto error;
+ }
+
+ done = true;
+
+error:
+ if (!done) {
+ close (eh->listener_fd);
+ eh->listener_fd = -1;
+ }
+
+ return done;
+}
+
/**
* @brief Create edge handle.
*/
eh->flags = flags;
eh->broker_h = NULL;
nns_edge_metadata_init (&eh->meta);
+ eh->listening = false;
+ eh->listener_fd = -1;
/* Connection data for each client ID. */
eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
int
nns_edge_start (nns_edge_h edge_h)
{
- GSocketAddress *saddr = NULL;
- GError *err = NULL;
nns_edge_handle_s *eh;
int ret = NNS_EDGE_ERROR_NONE;
}
}
- /** Initialize server src data. */
- eh->listener = g_socket_listener_new ();
- g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
-
- if (!_nns_edge_get_saddr (eh->host, eh->port, &saddr)) {
- nns_edge_loge ("Failed to get socket address (%s:%d).", eh->host, eh->port);
- ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
- goto error;
- }
- if (!g_socket_listener_add_address (eh->listener, saddr,
- G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
- nns_edge_loge ("Failed to add address (%s:%d): %s", eh->host, eh->port,
- err->message);
- g_clear_error (&err);
- ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+ /* Start listener thread to accept socket. */
+ if (!_nns_edge_create_socket_listener (eh)) {
+ nns_edge_loge ("Failed to create socket listener.");
+ ret = NNS_EDGE_ERROR_IO;
goto error;
}
- g_socket_listener_accept_socket_async (eh->listener, NULL,
- (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
-
error:
- if (saddr)
- g_object_unref (saddr);
-
nns_edge_unlock (eh);
return ret;
}
eh->event_cb = NULL;
eh->user_data = NULL;
- if (eh->listener)
- g_clear_object (&eh->listener);
+ if (eh->listener_thread) {
+ eh->listening = false;
+ pthread_cancel (eh->listener_thread);
+ pthread_join (eh->listener_thread, NULL);
+ eh->listener_thread = 0;
+ }
+
+ if (eh->listener_fd >= 0) {
+ close (eh->listener_fd);
+ eh->listener_fd = -1;
+ }
g_hash_table_destroy (eh->conn_table);
eh->conn_table = NULL;