if (cdata) {
_nns_edge_close_connection (cdata->src_conn);
_nns_edge_close_connection (cdata->sink_conn);
+ cdata->src_conn = cdata->sink_conn = NULL;
g_free (cdata);
}
}
/**
- * @brief Connect to the destination node.
+ * @brief Connect to the destination node. (host:sender(sink) - dest:receiver(listener, src))
*/
static int
-_nns_edge_connect_to (nns_edge_handle_s * eh, const char *ip, int port)
+_nns_edge_connect_to (nns_edge_handle_s * eh, int64_t client_id,
+ const char *ip, int port)
{
nns_edge_conn_s *conn = NULL;
nns_edge_conn_data_s *conn_data;
nns_edge_cmd_s cmd;
char *host;
- int64_t client_id;
bool done = false;
int ret;
goto error;
}
- /* Get destination capability. */
- _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, eh->client_id);
- ret = _nns_edge_cmd_receive (conn, &cmd);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to receive capability.");
- goto error;
- }
+ if (!eh->is_server) {
+ /* Receive capability and client ID from server. */
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
+ ret = _nns_edge_cmd_receive (conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to receive capability.");
+ goto error;
+ }
- if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
- nns_edge_loge ("Failed to get capability.");
- _nns_edge_cmd_clear (&cmd);
- goto error;
- }
+ if (cmd.info.cmd != _NNS_EDGE_CMD_CAPABILITY) {
+ nns_edge_loge ("Failed to get capability.");
+ _nns_edge_cmd_clear (&cmd);
+ goto error;
+ }
- client_id = eh->client_id = cmd.info.client_id;
+ client_id = eh->client_id = cmd.info.client_id;
- /* Check compatibility. */
- ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
- cmd.mem[0], cmd.info.mem_size[0], NULL);
- _nns_edge_cmd_clear (&cmd);
+ /* Check compatibility. */
+ ret = _nns_edge_invoke_event_cb (eh, NNS_EDGE_EVENT_CAPABILITY,
+ cmd.mem[0], cmd.info.mem_size[0], NULL);
+ _nns_edge_cmd_clear (&cmd);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("The event returns error, capability is not acceptable.");
- _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
- } else {
- /* Send ip and port to destination. */
- _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("The event returns error, capability is not acceptable.");
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_ERROR, client_id);
+ } else {
+ /* Send ip and port to destination. */
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_HOST_INFO, client_id);
- _get_host_str (eh->recv_ip, eh->recv_port, &host);
- cmd.info.num = 1;
- cmd.info.mem_size[0] = strlen (host) + 1;
- cmd.mem[0] = host;
- }
+ _get_host_str (eh->recv_ip, eh->recv_port, &host);
+ cmd.info.num = 1;
+ cmd.info.mem_size[0] = strlen (host) + 1;
+ cmd.mem[0] = host;
+ }
- ret = _nns_edge_cmd_send (conn, &cmd);
- _nns_edge_cmd_clear (&cmd);
+ ret = _nns_edge_cmd_send (conn, &cmd);
+ _nns_edge_cmd_clear (&cmd);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to send host info.");
- goto error;
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to send host info.");
+ goto error;
+ }
}
conn_data = _nns_edge_add_connection (eh, client_id);
client_id = _tdata->client_id;
g_free (_tdata);
+ conn->running = 1;
while (conn->running) {
nns_edge_data_h data_h;
unsigned int i;
/** Create message receving thread */
pthread_attr_init (&attr);
pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
- conn->running = 1;
+
thread_data->eh = eh;
thread_data->conn = conn;
thread_data->client_id = client_id;
if (tid < 0) {
nns_edge_loge ("Failed to create message handler thread.");
conn->running = 0;
+ conn->msg_thread = 0;
g_free (thread_data);
return NNS_EDGE_ERROR_IO;
}
goto error;
}
- /* Send capability and info to check compatibility. */
client_id = eh->is_server ? g_get_monotonic_time () : eh->client_id;
- if (!STR_IS_VALID (eh->caps_str)) {
- nns_edge_loge ("Cannot accept socket, invalid capability.");
- goto error;
- }
+ /* Send capability and info to check compatibility. */
+ if (eh->is_server) {
+ if (!STR_IS_VALID (eh->caps_str)) {
+ nns_edge_loge ("Cannot accept socket, invalid capability.");
+ goto error;
+ }
- _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
- cmd.info.num = 1;
- cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
- cmd.mem[0] = eh->caps_str;
+ _nns_edge_cmd_init (&cmd, _NNS_EDGE_CMD_CAPABILITY, client_id);
+ cmd.info.num = 1;
+ cmd.info.mem_size[0] = strlen (eh->caps_str) + 1;
+ cmd.mem[0] = eh->caps_str;
- ret = _nns_edge_cmd_send (conn, &cmd);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to send capability.");
- goto error;
- }
+ ret = _nns_edge_cmd_send (conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to send capability.");
+ goto error;
+ }
- /* Receive ip and port from destination. */
- ret = _nns_edge_cmd_receive (conn, &cmd);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to receive node info.");
- goto error;
- }
+ /* Receive ip and port from destination. */
+ ret = _nns_edge_cmd_receive (conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to receive node info.");
+ goto error;
+ }
- if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
- nns_edge_loge ("Failed to get host info.");
+ if (cmd.info.cmd != _NNS_EDGE_CMD_HOST_INFO) {
+ nns_edge_loge ("Failed to get host info.");
+ _nns_edge_cmd_clear (&cmd);
+ goto error;
+ }
+
+ _parse_host_str (cmd.mem[0], &connected_ip, &connected_port);
_nns_edge_cmd_clear (&cmd);
- goto error;
- }
- _parse_host_str (cmd.mem[0], &connected_ip, &connected_port);
- _nns_edge_cmd_clear (&cmd);
+ /* Connect to client listener. */
+ ret = _nns_edge_connect_to (eh, client_id, connected_ip, connected_port);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to connect host %s:%d.",
+ connected_ip, connected_port);
+ goto error;
+ }
+ }
ret = _nns_edge_create_message_thread (eh, conn, client_id);
if (ret != NNS_EDGE_ERROR_NONE) {
}
error:
- if (done) {
- if (eh->is_server) {
- _nns_edge_connect_to (eh, connected_ip, connected_port);
- }
- } else {
+ if (!done) {
_nns_edge_close_connection (conn);
}
}
/**
- * @brief Initialize the nnstreamer edge handle.
+ * @brief Start the nnstreamer edge.
*/
int
nns_edge_start (nns_edge_h edge_h, bool is_server)
ret = NNS_EDGE_ERROR_CONNECTION_FAILURE;
goto error;
}
- g_object_unref (saddr);
- saddr = NULL;
g_socket_listener_accept_socket_async (eh->listener, eh->cancellable,
(GAsyncReadyCallback) _nns_edge_accept_socket_async_cb, eh);
return NNS_EDGE_ERROR_CONNECTION_FAILURE;
}
- eh->is_server = false;
eh->protocol = protocol;
/** Connect to info channel. */
- ret = _nns_edge_connect_to (eh, ip, port);
+ ret = _nns_edge_connect_to (eh, eh->client_id, ip, port);
if (ret != NNS_EDGE_ERROR_NONE) {
nns_edge_loge ("Failed to connect to %s:%d", ip, port);
}
}
ret = _nns_edge_cmd_send (conn_data->sink_conn, &cmd);
+ if (ret != NNS_EDGE_ERROR_NONE)
+ nns_edge_loge ("Failed to request, cannot send edge data.");
nns_edge_unlock (eh);
return ret;