From 26cf02c394fdffa09d84e979706e816fe1772f7a Mon Sep 17 00:00:00 2001 From: Jaeyun Date: Mon, 11 Jul 2022 14:17:23 +0900 Subject: [PATCH] [Connection] cmd between server and client Set flag to send/receive command and capability between server and client. Signed-off-by: Jaeyun --- .../nnstreamer-edge-internal.c | 156 ++++++++++-------- 1 file changed, 83 insertions(+), 73 deletions(-) diff --git a/src/libnnstreamer-edge/nnstreamer-edge-internal.c b/src/libnnstreamer-edge/nnstreamer-edge-internal.c index 66e6963..8cb8a94 100644 --- a/src/libnnstreamer-edge/nnstreamer-edge-internal.c +++ b/src/libnnstreamer-edge/nnstreamer-edge-internal.c @@ -525,6 +525,7 @@ _nns_edge_remove_connection (gpointer data) if (cdata) { _nns_edge_close_connection (cdata->src_conn); _nns_edge_close_connection (cdata->sink_conn); + cdata->src_conn = cdata->sink_conn = NULL; g_free (cdata); } @@ -619,16 +620,16 @@ done: } /** - * @brief Connect to the destination node. + * @brief Connect to the destination node. (host:sender(sink) - dest:receiver(listener, src)) */ static int -_nns_edge_connect_to (nns_edge_handle_s * eh, const char *ip, int port) +_nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id, + const char *ip, int port) { nns_edge_conn_s *conn = NULL; nns_edge_conn_data_s *conn_data; nns_edge_cmd_s cmd; char *host; - int64_t client_id; bool done = false; int ret; @@ -647,46 +648,48 @@ _nns_edge_connect_to (nns_edge_handle_s * eh, const char *ip, int port) goto error; } - /* Get destination capability. */ - _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, eh->client_id); - ret = _nns_edge_cmd_receive (conn, &cmd); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to receive capability."); - goto error; - } + if (!eh->is_server) { + /* Receive capability and client ID from server. */ + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id); + ret = _nns_edge_cmd_receive (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to receive capability."); + goto error; + } - if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) { - nns_edge_loge ("Failed to get capability."); - _nns_edge_cmd_clear (&cmd); - goto error; - } + if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) { + nns_edge_loge ("Failed to get capability."); + _nns_edge_cmd_clear (&cmd); + goto error; + } - client_id = eh->client_id = cmd.info.client_id; + client_id = eh->client_id = cmd.info.client_id; - /* Check compatibility. */ - ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY, - cmd.mem[0], cmd.info.mem_size[0], NULL); - _nns_edge_cmd_clear (&cmd); + /* Check compatibility. */ + ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY, + cmd.mem[0], cmd.info.mem_size[0], NULL); + _nns_edge_cmd_clear (&cmd); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("The event returns error, capability is not acceptable."); - _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id); - } else { - /* Send ip and port to destination. */ - _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("The event returns error, capability is not acceptable."); + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id); + } else { + /* Send ip and port to destination. */ + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id); - _get_host_str (eh->recv_ip, eh->recv_port, &host); - cmd.info.num = 1; - cmd.info.mem_size[0] = strlen (host) + 1; - cmd.mem[0] = host; - } + _get_host_str (eh->recv_ip, eh->recv_port, &host); + cmd.info.num = 1; + cmd.info.mem_size[0] = strlen (host) + 1; + cmd.mem[0] = host; + } - ret = _nns_edge_cmd_send (conn, &cmd); - _nns_edge_cmd_clear (&cmd); + ret = _nns_edge_cmd_send (conn, &cmd); + _nns_edge_cmd_clear (&cmd); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to send host info."); - goto error; + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to send host info."); + goto error; + } } conn_data = _nns_edge_add_connection (eh, client_id); @@ -730,6 +733,7 @@ _nns_edge_message_handler (void *thread_data) client_id = _tdata->client_id; g_free (_tdata); + conn->running = 1; while (conn->running) { nns_edge_data_h data_h; unsigned int i; @@ -810,7 +814,7 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn, /** Create message receving thread */ pthread_attr_init (&attr); pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE); - conn->running = 1; + thread_data->eh = eh; thread_data->conn = conn; thread_data->client_id = client_id; @@ -822,6 +826,7 @@ _nns_edge_create_message_thread (nns_edge_handle_s * eh, nns_edge_conn_s * conn, if (tid < 0) { nns_edge_loge ("Failed to create message handler thread."); conn->running = 0; + conn->msg_thread = 0; g_free (thread_data); return NNS_EDGE_ERROR_IO; } @@ -878,40 +883,50 @@ _nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result, goto error; } - /* Send capability and info to check compatibility. */ client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id; - if (!STR_IS_VALID (eh->caps_str)) { - nns_edge_loge ("Cannot accept socket, invalid capability."); - goto error; - } + /* Send capability and info to check compatibility. */ + if (eh->is_server) { + if (!STR_IS_VALID (eh->caps_str)) { + nns_edge_loge ("Cannot accept socket, invalid capability."); + goto error; + } - _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id); - cmd.info.num = 1; - cmd.info.mem_size[0] = strlen (eh->caps_str) + 1; - cmd.mem[0] = eh->caps_str; + _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id); + cmd.info.num = 1; + cmd.info.mem_size[0] = strlen (eh->caps_str) + 1; + cmd.mem[0] = eh->caps_str; - ret = _nns_edge_cmd_send (conn, &cmd); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to send capability."); - goto error; - } + ret = _nns_edge_cmd_send (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to send capability."); + goto error; + } - /* Receive ip and port from destination. */ - ret = _nns_edge_cmd_receive (conn, &cmd); - if (ret != NNS_EDGE_ERROR_NONE) { - nns_edge_loge ("Failed to receive node info."); - goto error; - } + /* Receive ip and port from destination. */ + ret = _nns_edge_cmd_receive (conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to receive node info."); + goto error; + } - if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) { - nns_edge_loge ("Failed to get host info."); + if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) { + nns_edge_loge ("Failed to get host info."); + _nns_edge_cmd_clear (&cmd); + goto error; + } + + _parse_host_str (cmd.mem[0], &connected_ip, &connected_port); _nns_edge_cmd_clear (&cmd); - goto error; - } - _parse_host_str (cmd.mem[0], &connected_ip, &connected_port); - _nns_edge_cmd_clear (&cmd); + /* Connect to client listener. */ + ret = _nns_edge_connect_to (eh, client_id, connected_ip, connected_port); + if (ret != NNS_EDGE_ERROR_NONE) { + nns_edge_loge ("Failed to connect host %s:%d.", + connected_ip, connected_port); + goto error; + } + } ret = _nns_edge_create_message_thread (eh, conn, client_id); if (ret != NNS_EDGE_ERROR_NONE) { @@ -928,11 +943,7 @@ _nns_edge_accept_socket_async_cb (GObject * source, GAsyncResult * result, } error: - if (done) { - if (eh->is_server) { - _nns_edge_connect_to (eh, connected_ip, connected_port); - } - } else { + if (!done) { _nns_edge_close_connection (conn); } @@ -996,7 +1007,7 @@ nns_edge_create_handle (const char *id, const char *topic, nns_edge_h * edge_h) } /** - * @brief Initialize the nnstreamer edge handle. + * @brief Start the nnstreamer edge. */ int nns_edge_start (nns_edge_h edge_h, bool is_server) @@ -1048,8 +1059,6 @@ nns_edge_start (nns_edge_h edge_h, bool is_server) ret = NNS_EDGE_ERROR_CONNECTION_FAILURE; goto error; } - g_object_unref (saddr); - saddr = NULL; g_socket_listener_accept_socket_async (eh->listener, eh->cancellable, (GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh); @@ -1174,11 +1183,10 @@ nns_edge_connect (nns_edge_h edge_h, nns_edge_protocol_e protocol, return NNS_EDGE_ERROR_CONNECTION_FAILURE; } - eh->is_server = false; eh->protocol = protocol; /** Connect to info channel. */ - ret = _nns_edge_connect_to (eh, ip, port); + ret = _nns_edge_connect_to (eh, eh->client_id, ip, port); if (ret != NNS_EDGE_ERROR_NONE) { nns_edge_loge ("Failed to connect to %s:%d", ip, port); } @@ -1294,6 +1302,8 @@ nns_edge_request (nns_edge_h edge_h, nns_edge_data_h data_h) } ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd); + if (ret != NNS_EDGE_ERROR_NONE) + nns_edge_loge ("Failed to request, cannot send edge data."); nns_edge_unlock (eh); return ret; -- 2.34.1