From 3d1921f3d3580619837db39ef401f9160ca4cd73 Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Thu, 18 Aug 2022 15:55:18 +0900 Subject: [PATCH] [Socket] remove gio lib Update code - socket connection, remove gio library. Signed-off-by: Jaeyun --- CMakeLists.txt | 2 +- .../nnstreamer-edge-internal.c | 332 +++++++++--------- .../nnstreamer-edge-internal.h | 7 +- 3 files changed, 181 insertions(+), 160 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a60019d..5d259ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -63,7 +63,7 @@ ENDIF() # Check requires packages # TODO FIXME remove glib dependency -SET(REQUIRES_LIST "glib-2.0 gio-2.0") +SET(REQUIRES_LIST "glib-2.0") PKG_CHECK_MODULES(EDGE_REQUIRE_PKGS REQUIRED ${REQUIRES_LIST}) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index e5e721b..98c53ae 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -10,6 +10,10 @@ * @bug No known bugs except for NYI items */ +#include +#include +#include + #include "nnstreamer-edge-common.h" #include "nnstreamer-edge-internal.h" @@ -62,7 +66,7 @@ typedef struct int port; int8_t running; pthread_t msg_thread; - GSocket *socket; + int sockfd; } nns_edge_conn_s; /** @@ -85,6 +89,20 @@ typedef struct nns_edge_conn_s *conn; } nns_edge_thread_data_s; +/** + * @brief Set socket option. + * @todo handle connection type (TCP/UDP). + */ +static void +_set_socket_option (int fd) +{ + int nodelay = 1; + + /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ + if (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof (int)) < 0) + nns_edge_logw ("Failed to set TCP delay option."); +} + /** * @brief Send data to connected socket. */ @@ -93,20 +111,12 @@ _send_raw_data (nns_edge_conn_s * conn, void *data, size_t size) { size_t sent = 0; ssize_t rret; - GError *err = NULL; while (sent < size) { - rret = g_socket_send (conn->socket, (char *) data + sent, size - sent, - NULL, &err); - - if (rret == 0) { - nns_edge_loge ("Connection closed."); - return false; - } + rret = send (conn->sockfd, (char *) data + sent, size - sent, 0); - if (rret < 0) { - nns_edge_loge ("Error while sending data (%s).", err->message); - g_clear_error (&err); + if (rret <= 0) { + nns_edge_loge ("Failed to send raw data."); return false; } @@ -124,20 +134,12 @@ _receive_raw_data (nns_edge_conn_s * conn, void *data, size_t size) { size_t received = 0; ssize_t rret; - GError *err = NULL; while (received < size) { - rret = g_socket_receive (conn->socket, (char *) data + received, - size - received, NULL, &err); - - if (rret == 0) { - nns_edge_loge ("Connection closed."); - return false; - } + rret = recv (conn->sockfd, (char *) data + received, size - received, 0); - if (rret < 0) { - nns_edge_loge ("Failed to read from socket (%s).", err->message); - g_clear_error (&err); + if (rret <= 0) { + nns_edge_loge ("Failed to receive raw data."); return false; } @@ -153,15 +155,18 @@ _receive_raw_data (nns_edge_conn_s * conn, void *data, size_t size) static bool _nns_edge_check_connection (nns_edge_conn_s * conn) { - GIOCondition condition; + struct pollfd poll_fd; + int n; - if (!conn || !conn->socket || g_socket_is_closed (conn->socket)) + if (!conn || conn->sockfd < 0) return false; - condition = g_socket_condition_check (conn->socket, - G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP); + poll_fd.fd = conn->sockfd; + poll_fd.events = POLLIN | POLLOUT | POLLPRI | POLLERR | POLLHUP; + poll_fd.revents = 0; - if (!condition || (condition & (G_IO_ERR | G_IO_HUP))) { + n = poll (&poll_fd, 1, 0); + if (n <= 0 || poll_fd.revents & (POLLERR | POLLHUP)) { nns_edge_logw ("Socket is not available, possibly closed."); return false; } @@ -436,7 +441,7 @@ _nns_edge_close_connection (nns_edge_conn_s * conn) conn->msg_thread = 0; } - if (conn->socket) { + if (conn->sockfd >= 0) { nns_edge_cmd_s cmd; /* Send error before closing the socket. */ @@ -444,11 +449,9 @@ _nns_edge_close_connection (nns_edge_conn_s * conn) _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0); _nns_edge_cmd_send (conn, &cmd); - /** - * Close and release the socket. - * Using GSocket, if its last reference is dropped, it will close socket automatically. - */ - g_clear_object (&conn->socket); + if (close (conn->sockfd) < 0) + nns_edge_logw ("Failed to close socket."); + conn->sockfd = -1; } SAFE_FREE (conn->host); @@ -519,91 +522,47 @@ _nns_edge_remove_connection (gpointer data) } } -/** - * @brief Get socket address - */ -static bool -_nns_edge_get_saddr (const char *host, const int port, GSocketAddress ** saddr) -{ - GError *err = NULL; - GInetAddress *addr; - - /* look up name if we need to */ - addr = g_inet_address_new_from_string (host); - if (!addr) { - GList *results; - GResolver *resolver; - resolver = g_resolver_get_default (); - results = g_resolver_lookup_by_name (resolver, host, NULL, &err); - if (!results) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - nns_edge_loge ("Failed to resolve host, name resolver is cancelled."); - } else { - nns_edge_loge ("Failed to resolve host '%s': %s", host, err->message); - } - g_clear_error (&err); - g_object_unref (resolver); - return false; - } - /** @todo Try with the second address if the first fails */ - addr = G_INET_ADDRESS (g_object_ref (results->data)); - g_resolver_free_addresses (results); - g_object_unref (resolver); - } - - *saddr = g_inet_socket_address_new (addr, port); - g_object_unref (addr); - - return true; -} - /** * @brief Connect to requested socket. */ static bool _nns_edge_connect_socket (nns_edge_conn_s * conn) { - GError *err = NULL; - GSocketAddress *saddr = NULL; - bool ret = false; + struct sockaddr_in saddr = { 0 }; + socklen_t saddr_len = sizeof (struct sockaddr_in); - if (!_nns_edge_get_saddr (conn->host, conn->port, &saddr)) { - nns_edge_loge ("Failed to get socket address"); - return ret; - } + /** + * @todo handle protocol + * 1. support edge connection type (TCP/UDP) + * 2. ipv4 and ipv6 + */ + saddr.sin_family = AF_INET; + saddr.sin_port = htons (conn->port); - /* create sending client socket */ - conn->socket = - g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM, - G_SOCKET_PROTOCOL_TCP, &err); + if ((saddr.sin_addr.s_addr = inet_addr (conn->host)) == -1) { + struct hostent *ent = gethostbyname (conn->host); + if (!ent) { + nns_edge_loge ("Failed to connect socket, invalid host %s.", conn->host); + return false; + } - if (!conn->socket) { - nns_edge_loge ("Failed to create new socket."); - goto done; + memmove (&saddr.sin_addr, ent->h_addr, ent->h_length); } - /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ - if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) { - nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); - goto done; + conn->sockfd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (conn->sockfd < 0) { + nns_edge_loge ("Failed to create new socket."); + return false; } - if (!g_socket_connect (conn->socket, saddr, NULL, &err)) { - if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { - nns_edge_logd ("Connection cancelled (%s:%d).", conn->host, conn->port); - } else { - nns_edge_loge ("Failed to connect host %s:%d.", conn->host, conn->port); - } - goto done; - } + _set_socket_option (conn->sockfd); - /* now connected to the requested socket */ - ret = true; + if (connect (conn->sockfd, (struct sockaddr *) &saddr, saddr_len) < 0) { + nns_edge_loge ("Failed to connect host %s:%d.", conn->host, conn->port); + return false; + } -done: - g_object_unref (saddr); - g_clear_error (&err); - return ret; + return true; } /** @@ -628,6 +587,7 @@ _nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id, conn->host = nns_edge_strdup (host); conn->port = port; + conn->sockfd = -1; if (!_nns_edge_connect_socket (conn)) { goto error; @@ -835,16 +795,11 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn, } /** - * @brief Callback for socket listener, accept socket and create message thread. + * @brief Accept socket and create message thread in socket listener thread. */ static void -_nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result, - gpointer user_data) +_nns_edge_accept_socket (nns_edge_handle_s * eh) { - GSocketListener *socket_listener = G_SOCKET_LISTENER (source); - GSocket *socket = NULL; - GError *err = NULL; - nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data; bool done = false; nns_edge_conn_s *conn; nns_edge_conn_data_s *conn_data; @@ -853,32 +808,20 @@ _nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result, char *dest_host = NULL; int dest_port, ret; - socket = - g_socket_listener_accept_socket_finish (socket_listener, result, NULL, - &err); - - if (!socket) { - nns_edge_loge ("Failed to get socket: %s", err->message); - g_clear_error (&err); - return; - } - g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC); - conn = (nns_edge_conn_s *) calloc (1, sizeof (nns_edge_conn_s)); if (!conn) { nns_edge_loge ("Failed to allocate edge connection."); goto error; } - conn->socket = socket; - - /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */ - if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) { - nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message); - g_clear_error (&err); + conn->sockfd = accept (eh->listener_fd, NULL, NULL); + if (conn->sockfd < 0) { + nns_edge_loge ("Failed to accept socket."); goto error; } + _set_socket_option (conn->sockfd); + if (eh->flags & NNS_EDGE_FLAG_SERVER) client_id = g_get_monotonic_time (); else @@ -944,13 +887,96 @@ error: if (!done) _nns_edge_close_connection (conn); - if (eh->listener) - g_socket_listener_accept_socket_async (eh->listener, NULL, - (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh); - SAFE_FREE (dest_host); } +/** + * @brief Socket listener thread. + */ +static void * +_nns_edge_socket_listener_thread (void *thread_data) +{ + nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data; + + eh->listening = true; + while (eh->listening) { + struct pollfd poll_fd; + + poll_fd.fd = eh->listener_fd; + poll_fd.events = POLLIN | POLLHUP | POLLERR; + poll_fd.revents = 0; + + /* 10 milliseconds */ + if (poll (&poll_fd, 1, 10) > 0) { + if (!eh->listening) + break; + + if (poll_fd.revents & (POLLERR | POLLHUP)) { + nns_edge_loge ("Invalid state, possibly socket is closed in listener."); + break; + } + + if (poll_fd.revents & POLLIN) + _nns_edge_accept_socket (eh); + } + } + + return NULL; +} + +/** + * @brief Create socket listener. + * @note This function should be called with handle lock. + */ +static bool +_nns_edge_create_socket_listener (nns_edge_handle_s * eh) +{ + bool done = false; + struct sockaddr_in saddr = { 0 }; + socklen_t saddr_len = sizeof (struct sockaddr_in); + pthread_attr_t attr; + int tid; + + eh->listener_fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (eh->listener_fd < 0) { + nns_edge_loge ("Failed to create listener socket."); + return false; + } + + saddr.sin_family = AF_INET; + saddr.sin_addr.s_addr = htonl (INADDR_ANY); + saddr.sin_port = htons (eh->port); + + if (bind (eh->listener_fd, (struct sockaddr *) &saddr, saddr_len) < 0 || + listen (eh->listener_fd, N_BACKLOG) < 0) { + nns_edge_loge ("Failed to create listener, cannot bind socket."); + goto error; + } + + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE); + tid = pthread_create (&eh->listener_thread, &attr, + _nns_edge_socket_listener_thread, eh); + pthread_attr_destroy (&attr); + + if (tid < 0) { + nns_edge_loge ("Failed to create listener thread."); + eh->listening = false; + eh->listener_thread = 0; + goto error; + } + + done = true; + +error: + if (!done) { + close (eh->listener_fd); + eh->listener_fd = -1; + } + + return done; +} + /** * @brief Create edge handle. */ @@ -1001,6 +1027,8 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, eh->flags = flags; eh->broker_h = NULL; nns_edge_metadata_init (&eh->meta); + eh->listening = false; + eh->listener_fd = -1; /* Connection data for each client ID. */ eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, @@ -1016,8 +1044,6 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type, int nns_edge_start (nns_edge_h edge_h) { - GSocketAddress *saddr = NULL; - GError *err = NULL; nns_edge_handle_s *eh; int ret = NNS_EDGE_ERROR_NONE; @@ -1076,31 +1102,14 @@ nns_edge_start (nns_edge_h edge_h) } } - /** Initialize server src data. */ - eh->listener = g_socket_listener_new (); - g_socket_listener_set_backlog (eh->listener, N_BACKLOG); - - if (!_nns_edge_get_saddr (eh->host, eh->port, &saddr)) { - nns_edge_loge ("Failed to get socket address (%s:%d).", eh->host, eh->port); - ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; - goto error; - } - if (!g_socket_listener_add_address (eh->listener, saddr, - G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) { - nns_edge_loge ("Failed to add address (%s:%d): %s", eh->host, eh->port, - err->message); - g_clear_error (&err); - ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; + /* Start listener thread to accept socket. */ + if (!_nns_edge_create_socket_listener (eh)) { + nns_edge_loge ("Failed to create socket listener."); + ret = NNS_EDGE_ERROR_IO; goto error; } - g_socket_listener_accept_socket_async (eh->listener, NULL, - (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh); - error: - if (saddr) - g_object_unref (saddr); - nns_edge_unlock (eh); return ret; } @@ -1137,8 +1146,17 @@ nns_edge_release_handle (nns_edge_h edge_h) eh->event_cb = NULL; eh->user_data = NULL; - if (eh->listener) - g_clear_object (&eh->listener); + if (eh->listener_thread) { + eh->listening = false; + pthread_cancel (eh->listener_thread); + pthread_join (eh->listener_thread, NULL); + eh->listener_thread = 0; + } + + if (eh->listener_fd >= 0) { + close (eh->listener_fd); + eh->listener_fd = -1; + } g_hash_table_destroy (eh->conn_table); eh->conn_table = NULL; diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.h b/src/libnnstreamer-edge/nnstreamer-edge-internal.h index 2346177..698b58c 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.h +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.h @@ -20,7 +20,6 @@ extern "C" { #include "nnstreamer-edge.h" #include "nnstreamer-edge-common.h" -#include /** @todo remove glib */ /** * @brief Data structure for edge handle. @@ -46,7 +45,11 @@ typedef struct { char *caps_str; GHashTable *conn_table; - GSocketListener *listener; + + /* socket listener */ + bool listening; + int listener_fd; + pthread_t listener_thread; /* MQTT */ nns_edge_broker_h broker_h; -- 2.34.1