[Socket] remove gio lib accepted/tizen/unified/20220819.122504 submit/tizen/20220818.081536 submit/tizen/20220819.071123
authorJaeyun <jy1210.jung@samsung.com>
Thu, 18 Aug 2022 06:55:18 +0000 (15:55 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Fri, 19 Aug 2022 05:25:34 +0000 (14:25 +0900)
Update code - socket connection, remove gio library.

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
CMakeLists.txt
src/libnnstreamer-edge/nnstreamer-edge-internal.c
src/libnnstreamer-edge/nnstreamer-edge-internal.h

index a60019d5a16caba9d7aaa3a5b17e095de44d4b20..5d259aed7681b14c1f70de2efe63c4fb09c9abd9 100644 (file)
@@ -63,7 +63,7 @@ ENDIF()
 
 # Check requires packages
 # TODO FIXME remove glib dependency
-SET(REQUIRES_LIST "glib-2.0 gio-2.0")
+SET(REQUIRES_LIST "glib-2.0")
 
 PKG_CHECK_MODULES(EDGE_REQUIRE_PKGS REQUIRED ${REQUIRES_LIST})
 
index e5e721b5f7bd05919352bfe7b909a24f3176bb56..98c53aea2ea9543e762f86f2e63596993a06b301 100644 (file)
  * @bug    No known bugs except for NYI items
  */
 
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/poll.h>
+
 #include "nnstreamer-edge-common.h"
 #include "nnstreamer-edge-internal.h"
 
@@ -62,7 +66,7 @@ typedef struct
   int port;
   int8_t running;
   pthread_t msg_thread;
-  GSocket *socket;
+  int sockfd;
 } nns_edge_conn_s;
 
 /**
@@ -85,6 +89,20 @@ typedef struct
   nns_edge_conn_s *conn;
 } nns_edge_thread_data_s;
 
+/**
+ * @brief Set socket option.
+ * @todo handle connection type (TCP/UDP).
+ */
+static void
+_set_socket_option (int fd)
+{
+  int nodelay = 1;
+
+  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
+  if (setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, &nodelay, sizeof (int)) < 0)
+    nns_edge_logw ("Failed to set TCP delay option.");
+}
+
 /**
  * @brief Send data to connected socket.
  */
@@ -93,20 +111,12 @@ _send_raw_data (nns_edge_conn_s * conn, void *data, size_t size)
 {
   size_t sent = 0;
   ssize_t rret;
-  GError *err = NULL;
 
   while (sent < size) {
-    rret = g_socket_send (conn->socket, (char *) data + sent, size - sent,
-        NULL, &err);
-
-    if (rret == 0) {
-      nns_edge_loge ("Connection closed.");
-      return false;
-    }
+    rret = send (conn->sockfd, (char *) data + sent, size - sent, 0);
 
-    if (rret < 0) {
-      nns_edge_loge ("Error while sending data (%s).", err->message);
-      g_clear_error (&err);
+    if (rret <= 0) {
+      nns_edge_loge ("Failed to send raw data.");
       return false;
     }
 
@@ -124,20 +134,12 @@ _receive_raw_data (nns_edge_conn_s * conn, void *data, size_t size)
 {
   size_t received = 0;
   ssize_t rret;
-  GError *err = NULL;
 
   while (received < size) {
-    rret = g_socket_receive (conn->socket, (char *) data + received,
-        size - received, NULL, &err);
-
-    if (rret == 0) {
-      nns_edge_loge ("Connection closed.");
-      return false;
-    }
+    rret = recv (conn->sockfd, (char *) data + received, size - received, 0);
 
-    if (rret < 0) {
-      nns_edge_loge ("Failed to read from socket (%s).", err->message);
-      g_clear_error (&err);
+    if (rret <= 0) {
+      nns_edge_loge ("Failed to receive raw data.");
       return false;
     }
 
@@ -153,15 +155,18 @@ _receive_raw_data (nns_edge_conn_s * conn, void *data, size_t size)
 static bool
 _nns_edge_check_connection (nns_edge_conn_s * conn)
 {
-  GIOCondition condition;
+  struct pollfd poll_fd;
+  int n;
 
-  if (!conn || !conn->socket || g_socket_is_closed (conn->socket))
+  if (!conn || conn->sockfd < 0)
     return false;
 
-  condition = g_socket_condition_check (conn->socket,
-      G_IO_IN | G_IO_OUT | G_IO_PRI | G_IO_ERR | G_IO_HUP);
+  poll_fd.fd = conn->sockfd;
+  poll_fd.events = POLLIN | POLLOUT | POLLPRI | POLLERR | POLLHUP;
+  poll_fd.revents = 0;
 
-  if (!condition || (condition & (G_IO_ERR | G_IO_HUP))) {
+  n = poll (&poll_fd, 1, 0);
+  if (n <= 0 || poll_fd.revents & (POLLERR | POLLHUP)) {
     nns_edge_logw ("Socket is not available, possibly closed.");
     return false;
   }
@@ -436,7 +441,7 @@ _nns_edge_close_connection (nns_edge_conn_s * conn)
     conn->msg_thread = 0;
   }
 
-  if (conn->socket) {
+  if (conn->sockfd >= 0) {
     nns_edge_cmd_s cmd;
 
     /* Send error before closing the socket. */
@@ -444,11 +449,9 @@ _nns_edge_close_connection (nns_edge_conn_s * conn)
     _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, 0);
     _nns_edge_cmd_send (conn, &cmd);
 
-    /**
-     * Close and release the socket.
-     * Using GSocket, if its last reference is dropped, it will close socket automatically.
-     */
-    g_clear_object (&conn->socket);
+    if (close (conn->sockfd) < 0)
+      nns_edge_logw ("Failed to close socket.");
+    conn->sockfd = -1;
   }
 
   SAFE_FREE (conn->host);
@@ -519,91 +522,47 @@ _nns_edge_remove_connection (gpointer data)
   }
 }
 
-/**
- * @brief Get socket address
- */
-static bool
-_nns_edge_get_saddr (const char *host, const int port, GSocketAddress ** saddr)
-{
-  GError *err = NULL;
-  GInetAddress *addr;
-
-  /* look up name if we need to */
-  addr = g_inet_address_new_from_string (host);
-  if (!addr) {
-    GList *results;
-    GResolver *resolver;
-    resolver = g_resolver_get_default ();
-    results = g_resolver_lookup_by_name (resolver, host, NULL, &err);
-    if (!results) {
-      if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-        nns_edge_loge ("Failed to resolve host, name resolver is cancelled.");
-      } else {
-        nns_edge_loge ("Failed to resolve host '%s': %s", host, err->message);
-      }
-      g_clear_error (&err);
-      g_object_unref (resolver);
-      return false;
-    }
-    /** @todo Try with the second address if the first fails */
-    addr = G_INET_ADDRESS (g_object_ref (results->data));
-    g_resolver_free_addresses (results);
-    g_object_unref (resolver);
-  }
-
-  *saddr = g_inet_socket_address_new (addr, port);
-  g_object_unref (addr);
-
-  return true;
-}
-
 /**
  * @brief Connect to requested socket.
  */
 static bool
 _nns_edge_connect_socket (nns_edge_conn_s * conn)
 {
-  GError *err = NULL;
-  GSocketAddress *saddr = NULL;
-  bool ret = false;
+  struct sockaddr_in saddr = { 0 };
+  socklen_t saddr_len = sizeof (struct sockaddr_in);
 
-  if (!_nns_edge_get_saddr (conn->host, conn->port, &saddr)) {
-    nns_edge_loge ("Failed to get socket address");
-    return ret;
-  }
+  /**
+   * @todo handle protocol
+   * 1. support edge connection type (TCP/UDP)
+   * 2. ipv4 and ipv6
+   */
+  saddr.sin_family = AF_INET;
+  saddr.sin_port = htons (conn->port);
 
-  /* create sending client socket */
-  conn->socket =
-      g_socket_new (g_socket_address_get_family (saddr), G_SOCKET_TYPE_STREAM,
-      G_SOCKET_PROTOCOL_TCP, &err);
+  if ((saddr.sin_addr.s_addr = inet_addr (conn->host)) == -1) {
+    struct hostent *ent = gethostbyname (conn->host);
+    if (!ent) {
+      nns_edge_loge ("Failed to connect socket, invalid host %s.", conn->host);
+      return false;
+    }
 
-  if (!conn->socket) {
-    nns_edge_loge ("Failed to create new socket.");
-    goto done;
+    memmove (&saddr.sin_addr, ent->h_addr, ent->h_length);
   }
 
-  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
-  if (!g_socket_set_option (conn->socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
-    nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
-    goto done;
+  conn->sockfd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+  if (conn->sockfd < 0) {
+    nns_edge_loge ("Failed to create new socket.");
+    return false;
   }
 
-  if (!g_socket_connect (conn->socket, saddr, NULL, &err)) {
-    if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-      nns_edge_logd ("Connection cancelled (%s:%d).", conn->host, conn->port);
-    } else {
-      nns_edge_loge ("Failed to connect host %s:%d.", conn->host, conn->port);
-    }
-    goto done;
-  }
+  _set_socket_option (conn->sockfd);
 
-  /* now connected to the requested socket */
-  ret = true;
+  if (connect (conn->sockfd, (struct sockaddr *) &saddr, saddr_len) < 0) {
+    nns_edge_loge ("Failed to connect host %s:%d.", conn->host, conn->port);
+    return false;
+  }
 
-done:
-  g_object_unref (saddr);
-  g_clear_error (&err);
-  return ret;
+  return true;
 }
 
 /**
@@ -628,6 +587,7 @@ _nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id,
 
   conn->host = nns_edge_strdup (host);
   conn->port = port;
+  conn->sockfd = -1;
 
   if (!_nns_edge_connect_socket (conn)) {
     goto error;
@@ -835,16 +795,11 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn,
 }
 
 /**
- * @brief Callback for socket listener, accept socket and create message thread.
+ * @brief Accept socket and create message thread in socket listener thread.
  */
 static void
-_nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result,
-    gpointer user_data)
+_nns_edge_accept_socket (nns_edge_handle_s * eh)
 {
-  GSocketListener *socket_listener = G_SOCKET_LISTENER (source);
-  GSocket *socket = NULL;
-  GError *err = NULL;
-  nns_edge_handle_s *eh = (nns_edge_handle_s *) user_data;
   bool done = false;
   nns_edge_conn_s *conn;
   nns_edge_conn_data_s *conn_data;
@@ -853,32 +808,20 @@ _nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result,
   char *dest_host = NULL;
   int dest_port, ret;
 
-  socket =
-      g_socket_listener_accept_socket_finish (socket_listener, result, NULL,
-      &err);
-
-  if (!socket) {
-    nns_edge_loge ("Failed to get socket: %s", err->message);
-    g_clear_error (&err);
-    return;
-  }
-  g_socket_set_timeout (socket, DEFAULT_TIMEOUT_SEC);
-
   conn = (nns_edge_conn_s *) calloc (1, sizeof (nns_edge_conn_s));
   if (!conn) {
     nns_edge_loge ("Failed to allocate edge connection.");
     goto error;
   }
 
-  conn->socket = socket;
-
-  /* setting TCP_NODELAY to true in order to avoid packet batching as known as Nagle's algorithm */
-  if (!g_socket_set_option (socket, IPPROTO_TCP, TCP_NODELAY, true, &err)) {
-    nns_edge_loge ("Failed to set socket TCP_NODELAY option: %s", err->message);
-    g_clear_error (&err);
+  conn->sockfd = accept (eh->listener_fd, NULL, NULL);
+  if (conn->sockfd < 0) {
+    nns_edge_loge ("Failed to accept socket.");
     goto error;
   }
 
+  _set_socket_option (conn->sockfd);
+
   if (eh->flags & NNS_EDGE_FLAG_SERVER)
     client_id = g_get_monotonic_time ();
   else
@@ -944,13 +887,96 @@ error:
   if (!done)
     _nns_edge_close_connection (conn);
 
-  if (eh->listener)
-    g_socket_listener_accept_socket_async (eh->listener, NULL,
-        (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
-
   SAFE_FREE (dest_host);
 }
 
+/**
+ * @brief Socket listener thread.
+ */
+static void *
+_nns_edge_socket_listener_thread (void *thread_data)
+{
+  nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
+
+  eh->listening = true;
+  while (eh->listening) {
+    struct pollfd poll_fd;
+
+    poll_fd.fd = eh->listener_fd;
+    poll_fd.events = POLLIN | POLLHUP | POLLERR;
+    poll_fd.revents = 0;
+
+    /* 10 milliseconds */
+    if (poll (&poll_fd, 1, 10) > 0) {
+      if (!eh->listening)
+        break;
+
+      if (poll_fd.revents & (POLLERR | POLLHUP)) {
+        nns_edge_loge ("Invalid state, possibly socket is closed in listener.");
+        break;
+      }
+
+      if (poll_fd.revents & POLLIN)
+        _nns_edge_accept_socket (eh);
+    }
+  }
+
+  return NULL;
+}
+
+/**
+ * @brief Create socket listener.
+ * @note This function should be called with handle lock.
+ */
+static bool
+_nns_edge_create_socket_listener (nns_edge_handle_s * eh)
+{
+  bool done = false;
+  struct sockaddr_in saddr = { 0 };
+  socklen_t saddr_len = sizeof (struct sockaddr_in);
+  pthread_attr_t attr;
+  int tid;
+
+  eh->listener_fd = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+  if (eh->listener_fd < 0) {
+    nns_edge_loge ("Failed to create listener socket.");
+    return false;
+  }
+
+  saddr.sin_family = AF_INET;
+  saddr.sin_addr.s_addr = htonl (INADDR_ANY);
+  saddr.sin_port = htons (eh->port);
+
+  if (bind (eh->listener_fd, (struct sockaddr *) &saddr, saddr_len) < 0 ||
+      listen (eh->listener_fd, N_BACKLOG) < 0) {
+    nns_edge_loge ("Failed to create listener, cannot bind socket.");
+    goto error;
+  }
+
+  pthread_attr_init (&attr);
+  pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
+  tid = pthread_create (&eh->listener_thread, &attr,
+      _nns_edge_socket_listener_thread, eh);
+  pthread_attr_destroy (&attr);
+
+  if (tid < 0) {
+    nns_edge_loge ("Failed to create listener thread.");
+    eh->listening = false;
+    eh->listener_thread = 0;
+    goto error;
+  }
+
+  done = true;
+
+error:
+  if (!done) {
+    close (eh->listener_fd);
+    eh->listener_fd = -1;
+  }
+
+  return done;
+}
+
 /**
  * @brief Create edge handle.
  */
@@ -1001,6 +1027,8 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
   eh->flags = flags;
   eh->broker_h = NULL;
   nns_edge_metadata_init (&eh->meta);
+  eh->listening = false;
+  eh->listener_fd = -1;
 
   /* Connection data for each client ID. */
   eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
@@ -1016,8 +1044,6 @@ nns_edge_create_handle (const char *id, nns_edge_connect_type_e connect_type,
 int
 nns_edge_start (nns_edge_h edge_h)
 {
-  GSocketAddress *saddr = NULL;
-  GError *err = NULL;
   nns_edge_handle_s *eh;
   int ret = NNS_EDGE_ERROR_NONE;
 
@@ -1076,31 +1102,14 @@ nns_edge_start (nns_edge_h edge_h)
     }
   }
 
-  /** Initialize server src data. */
-  eh->listener = g_socket_listener_new ();
-  g_socket_listener_set_backlog (eh->listener, N_BACKLOG);
-
-  if (!_nns_edge_get_saddr (eh->host, eh->port, &saddr)) {
-    nns_edge_loge ("Failed to get socket address (%s:%d).", eh->host, eh->port);
-    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
-    goto error;
-  }
-  if (!g_socket_listener_add_address (eh->listener, saddr,
-          G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP, NULL, NULL, &err)) {
-    nns_edge_loge ("Failed to add address (%s:%d): %s", eh->host, eh->port,
-        err->message);
-    g_clear_error (&err);
-    ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
+  /* Start listener thread to accept socket. */
+  if (!_nns_edge_create_socket_listener (eh)) {
+    nns_edge_loge ("Failed to create socket listener.");
+    ret = NNS_EDGE_ERROR_IO;
     goto error;
   }
 
-  g_socket_listener_accept_socket_async (eh->listener, NULL,
-      (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
-
 error:
-  if (saddr)
-    g_object_unref (saddr);
-
   nns_edge_unlock (eh);
   return ret;
 }
@@ -1137,8 +1146,17 @@ nns_edge_release_handle (nns_edge_h edge_h)
   eh->event_cb = NULL;
   eh->user_data = NULL;
 
-  if (eh->listener)
-    g_clear_object (&eh->listener);
+  if (eh->listener_thread) {
+    eh->listening = false;
+    pthread_cancel (eh->listener_thread);
+    pthread_join (eh->listener_thread, NULL);
+    eh->listener_thread = 0;
+  }
+
+  if (eh->listener_fd >= 0) {
+    close (eh->listener_fd);
+    eh->listener_fd = -1;
+  }
 
   g_hash_table_destroy (eh->conn_table);
   eh->conn_table = NULL;
index 23461776423a155b29401caabbade671cc914252..698b58c73f6b1ed502ebb8e9b34a1fff0fba76a3 100644 (file)
@@ -20,7 +20,6 @@ extern "C" {
 
 #include "nnstreamer-edge.h"
 #include "nnstreamer-edge-common.h"
-#include <gio/gio.h> /** @todo remove glib */
 
 /**
  * @brief Data structure for edge handle.
@@ -46,7 +45,11 @@ typedef struct {
   char *caps_str;
 
   GHashTable *conn_table;
-  GSocketListener *listener;
+
+  /* socket listener */
+  bool listening;
+  int listener_fd;
+  pthread_t listener_thread;
 
   /* MQTT */
   nns_edge_broker_h broker_h;