ret = _nns_edge_cmd_send (conn, &cmd);
SAFE_FREE (cmd.meta);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to send edge data to destination (%s:%d).",
+ conn->host, conn->port);
+ }
+
return ret;
}
return NNS_EDGE_ERROR_NONE;
}
+/**
+ * @brief Thread to send data.
+ */
+static void *
+_nns_edge_send_thread (void *thread_data)
+{
+ nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
+ nns_edge_conn_data_s *conn_data;
+ nns_edge_data_h data_h;
+ int64_t *client_ids;
+ char *val;
+ unsigned int i, length;
+ int ret;
+
+ while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) {
+ /* Send data to destination */
+ ret = nns_edge_data_get_info (data_h, "client_id", &val);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_logd
+ ("Cannot find client ID in edge data. Send to all connected nodes.");
+ client_ids = (int64_t *) g_hash_table_get_keys_as_array (eh->conn_table,
+ &length);
+ } else {
+ length = 1U;
+
+ client_ids = (int64_t *) malloc (sizeof (int64_t));
+ if (client_ids)
+ *client_ids = strtoll (val, NULL, 10);
+
+ SAFE_FREE (val);
+ }
+
+ if (!client_ids) {
+ nns_edge_loge ("Failed to get client ID, cannot send edge data.");
+ goto done;
+ }
+
+ /** @todo update code for each connect type */
+ switch (eh->connect_type) {
+ case NNS_EDGE_CONNECT_TYPE_TCP:
+ case NNS_EDGE_CONNECT_TYPE_HYBRID:
+ for (i = 0; i < length; i++) {
+ conn_data = _nns_edge_get_connection (eh, client_ids[i]);
+ if (!conn_data) {
+ nns_edge_loge
+ ("Cannot find connection, invalid client ID or connection closed.");
+ goto done;
+ }
+
+ ret = _nns_edge_transfer_data (conn_data->sink_conn, data_h,
+ client_ids[i]);
+ if (ret != NNS_EDGE_ERROR_NONE) {
+ nns_edge_loge ("Failed to send edge data.");
+ goto done;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+
+ done:
+ SAFE_FREE (client_ids);
+ nns_edge_data_destroy (data_h);
+ }
+
+ return NULL;
+}
+
+/**
+ * @brief Create thread to send data.
+ */
+static int
+_nns_edge_create_send_thread (nns_edge_handle_s * eh)
+{
+ pthread_attr_t attr;
+ int tid;
+
+ pthread_attr_init (&attr);
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_JOINABLE);
+
+ tid = pthread_create (&eh->send_thread, &attr, _nns_edge_send_thread, eh);
+ pthread_attr_destroy (&attr);
+
+ if (tid < 0) {
+ nns_edge_loge ("Failed to create sender thread.");
+ eh->send_thread = 0;
+ return NNS_EDGE_ERROR_IO;
+ }
+
+ return NNS_EDGE_ERROR_NONE;
+}
+
/**
* @brief Accept socket and create message thread in socket listener thread.
*/
nns_edge_metadata_init (&eh->meta);
eh->listening = false;
eh->listener_fd = -1;
+ nns_edge_queue_create (&eh->send_queue);
/* Connection data for each client ID. */
eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
goto error;
}
+ ret = _nns_edge_create_send_thread (eh);
+
error:
nns_edge_unlock (eh);
return ret;
eh->event_cb = NULL;
eh->user_data = NULL;
+ nns_edge_queue_destroy (eh->send_queue);
+ eh->send_queue = NULL;
+
+ if (eh->send_thread) {
+ pthread_join (eh->send_thread, NULL);
+ eh->send_thread = 0;
+ }
+
if (eh->listener_thread) {
if (eh->listening) {
pthread_cancel (eh->listener_thread);
nns_edge_send (nns_edge_h edge_h, nns_edge_data_h data_h)
{
nns_edge_handle_s *eh;
- nns_edge_conn_data_s *conn_data;
- int64_t client_id;
- char *val;
- int ret = NNS_EDGE_ERROR_NONE;
+ nns_edge_data_h new_data_h;
eh = (nns_edge_handle_s *) edge_h;
if (!eh) {
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- ret = nns_edge_data_get_info (data_h, "client_id", &val);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Cannot find client ID in edge data.");
- return NNS_EDGE_ERROR_INVALID_PARAMETER;
- }
-
- client_id = strtoll (val, NULL, 10);
- SAFE_FREE (val);
-
nns_edge_lock (eh);
if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- /** @todo update code for each connect type */
- switch (eh->connect_type) {
- case NNS_EDGE_CONNECT_TYPE_TCP:
- case NNS_EDGE_CONNECT_TYPE_HYBRID:
- conn_data = _nns_edge_get_connection (eh, client_id);
- if (!conn_data) {
- nns_edge_loge
- ("Cannot find connection, invalid client ID or connection closed.");
- ret = NNS_EDGE_ERROR_INVALID_PARAMETER;
- break;
- }
+ if (!eh->send_thread) {
+ nns_edge_loge ("Invalid state, start edge before sending a data.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_IO;
+ }
- ret = _nns_edge_transfer_data (conn_data->sink_conn, data_h, client_id);
- if (ret != NNS_EDGE_ERROR_NONE)
- nns_edge_loge ("Failed to send edge data.");
- break;
- default:
- break;
+ /* Create new data handle and push it into send-queue. */
+ nns_edge_data_copy (data_h, &new_data_h);
+
+ if (!nns_edge_queue_push (eh->send_queue, new_data_h,
+ (nns_edge_queue_data_destroy_cb) nns_edge_data_destroy)) {
+ nns_edge_loge ("Failed to send data, cannot push data into queue.");
+ nns_edge_unlock (eh);
+ return NNS_EDGE_ERROR_IO;
}
nns_edge_unlock (eh);
- return ret;
+ return NNS_EDGE_ERROR_NONE;
}
/**