void *meta;
} nns_edge_cmd_s;
+/**
+ * @brief Data structure for connection data.
+ */
+typedef struct _nns_edge_conn_data_s nns_edge_conn_data_s;
+
/**
* @brief Data structure for edge connection.
*/
/**
* @brief Data structure for connection data.
*/
-typedef struct
+struct _nns_edge_conn_data_s
{
nns_edge_conn_s *src_conn;
nns_edge_conn_s *sink_conn;
int64_t id;
-} nns_edge_conn_data_s;
+ nns_edge_conn_data_s *next;
+};
/**
* @brief Structures for thread data of message handling.
return true;
}
+/**
+ * @brief Release connection data and its resources.
+ */
+static void
+_nns_edge_release_connection_data (nns_edge_conn_data_s * cdata)
+{
+ if (cdata) {
+ _nns_edge_close_connection (cdata->src_conn);
+ _nns_edge_close_connection (cdata->sink_conn);
+ free (cdata);
+ }
+}
+
/**
* @brief Get nnstreamer-edge connection data.
* @note This function should be called with handle lock.
static nns_edge_conn_data_s *
_nns_edge_get_connection (nns_edge_handle_s * eh, int64_t client_id)
{
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return NULL;
+ nns_edge_conn_data_s *cdata;
+
+ cdata = (nns_edge_conn_data_s *) eh->connections;
+
+ while (cdata) {
+ if (cdata->id == client_id)
+ return cdata;
+
+ cdata = cdata->next;
}
- return g_hash_table_lookup (eh->conn_table, GUINT_TO_POINTER (client_id));
+ return NULL;
}
/**
static nns_edge_conn_data_s *
_nns_edge_add_connection (nns_edge_handle_s * eh, int64_t client_id)
{
- nns_edge_conn_data_s *data = NULL;
+ nns_edge_conn_data_s *cdata;
- if (!NNS_EDGE_MAGIC_IS_VALID (eh)) {
- nns_edge_loge ("Invalid param, given edge handle is invalid.");
- return NULL;
- }
-
- data = g_hash_table_lookup (eh->conn_table, GUINT_TO_POINTER (client_id));
+ cdata = _nns_edge_get_connection (eh, client_id);
- if (NULL == data) {
- data = (nns_edge_conn_data_s *) calloc (1, sizeof (nns_edge_conn_data_s));
- if (NULL == data) {
+ if (NULL == cdata) {
+ cdata = (nns_edge_conn_data_s *) calloc (1, sizeof (nns_edge_conn_data_s));
+ if (NULL == cdata) {
nns_edge_loge ("Failed to allocate memory for connection data.");
return NULL;
}
- data->id = client_id;
-
- g_hash_table_insert (eh->conn_table, GUINT_TO_POINTER (client_id), data);
+ /* prepend connection data */
+ cdata->id = client_id;
+ cdata->next = eh->connections;
+ eh->connections = cdata;
}
- return data;
+ return cdata;
}
/**
- * @brief Remove nnstreamer-edge connection data. This will be called when removing connection data from hash table.
+ * @brief Remove nnstreamer-edge connection data.
+ * @note This function should be called with handle lock.
*/
static void
-_nns_edge_remove_connection (gpointer data)
+_nns_edge_remove_connection (nns_edge_handle_s * eh, int64_t client_id)
{
- nns_edge_conn_data_s *cdata = (nns_edge_conn_data_s *) data;
+ nns_edge_conn_data_s *cdata, *prev;
- if (cdata) {
- _nns_edge_close_connection (cdata->src_conn);
- _nns_edge_close_connection (cdata->sink_conn);
- cdata->src_conn = cdata->sink_conn = NULL;
+ cdata = (nns_edge_conn_data_s *) eh->connections;
+ prev = NULL;
+
+ while (cdata) {
+ if (cdata->id == client_id) {
+ if (prev)
+ prev->next = cdata->next;
+ else
+ eh->connections = cdata->next;
+
+ _nns_edge_release_connection_data (cdata);
+ return;
+ }
+
+ prev = cdata;
+ cdata = cdata->next;
+ }
+}
+
+/**
+ * @brief Remove all connection data.
+ * @note This function should be called with handle lock.
+ */
+static void
+_nns_edge_remove_all_connection (nns_edge_handle_s * eh)
+{
+ nns_edge_conn_data_s *cdata, *next;
+
+ cdata = (nns_edge_conn_data_s *) eh->connections;
+ eh->connections = NULL;
- SAFE_FREE (cdata);
+ while (cdata) {
+ next = cdata->next;
+
+ _nns_edge_release_connection_data (cdata);
+
+ cdata = next;
}
}
nns_edge_logd
("Received error from client, remove connection of client (ID: %lld).",
(long long) client_id);
- g_hash_table_remove (eh->conn_table, GUINT_TO_POINTER (client_id));
+ _nns_edge_remove_connection (eh, client_id);
}
return NULL;
{
nns_edge_handle_s *eh = (nns_edge_handle_s *) thread_data;
nns_edge_conn_data_s *conn_data;
+ nns_edge_conn_s *conn;
nns_edge_data_h data_h;
- int64_t *client_ids;
+ int64_t client_id;
char *val;
- unsigned int i, length;
int ret;
while (nns_edge_queue_wait_pop (eh->send_queue, 0U, &data_h)) {
if (ret != NNS_EDGE_ERROR_NONE) {
nns_edge_logd
("Cannot find client ID in edge data. Send to all connected nodes.");
- client_ids = (int64_t *) g_hash_table_get_keys_as_array (eh->conn_table,
- &length);
- } else {
- length = 1U;
- client_ids = (int64_t *) malloc (sizeof (int64_t));
- if (client_ids)
- *client_ids = strtoll (val, NULL, 10);
+ conn_data = (nns_edge_conn_data_s *) eh->connections;
+ while (conn_data) {
+ /** @todo update code for each connect type */
+ conn = conn_data->sink_conn;
+ _nns_edge_transfer_data (conn, data_h, conn_data->id);
+ conn_data = conn_data->next;
+ }
+ } else {
+ client_id = (int64_t) strtoll (val, NULL, 10);
SAFE_FREE (val);
- }
- if (!client_ids) {
- nns_edge_loge ("Failed to get client ID, cannot send edge data.");
- goto done;
- }
-
- /** @todo update code for each connect type */
- switch (eh->connect_type) {
- case NNS_EDGE_CONNECT_TYPE_TCP:
- case NNS_EDGE_CONNECT_TYPE_HYBRID:
- for (i = 0; i < length; i++) {
- conn_data = _nns_edge_get_connection (eh, client_ids[i]);
- if (!conn_data) {
- nns_edge_loge
- ("Cannot find connection, invalid client ID or connection closed.");
- goto done;
- }
-
- ret = _nns_edge_transfer_data (conn_data->sink_conn, data_h,
- client_ids[i]);
- if (ret != NNS_EDGE_ERROR_NONE) {
- nns_edge_loge ("Failed to send edge data.");
- goto done;
- }
- }
- break;
- default:
- break;
+ conn_data = _nns_edge_get_connection (eh, client_id);
+ if (conn_data) {
+ conn = conn_data->sink_conn;
+ _nns_edge_transfer_data (conn, data_h, client_id);
+ } else {
+ nns_edge_loge
+ ("Cannot find connection, invalid client ID or connection closed.");
+ }
}
- done:
- SAFE_FREE (client_ids);
nns_edge_data_destroy (data_h);
}
eh->dest_port = 0;
eh->flags = flags;
eh->broker_h = NULL;
+ eh->connections = NULL;
nns_edge_metadata_init (&eh->meta);
eh->listening = false;
eh->listener_fd = -1;
nns_edge_queue_create (&eh->send_queue);
- /* Connection data for each client ID. */
- eh->conn_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
- _nns_edge_remove_connection);
-
*edge_h = eh;
return NNS_EDGE_ERROR_NONE;
}
eh->listener_fd = -1;
}
- g_hash_table_destroy (eh->conn_table);
- eh->conn_table = NULL;
+ _nns_edge_remove_all_connection (eh);
nns_edge_metadata_free (&eh->meta);
SAFE_FREE (eh->id);
return NNS_EDGE_ERROR_INVALID_PARAMETER;
}
- g_hash_table_remove_all (eh->conn_table);
+ _nns_edge_remove_all_connection (eh);
nns_edge_unlock (eh);
return NNS_EDGE_ERROR_NONE;