1 /* SPDX-License-Identifier: LGPL-2.1-only */
3 * Copyright (C) 2021 Samsung Electronics Co., Ltd.
5 * @file tensor_query_server.c
7 * @brief GStreamer plugin to handle meta_query for server elements
8 * @author Junhwan Kim <jejudo.kim@samsung.com>
9 * @see http://github.com/nnstreamer/nnstreamer
17 #include "tensor_query_server.h"
18 #include <tensor_typedef.h>
19 #include <tensor_common.h>
22 * @brief mutex for tensor-query server table.
24 G_LOCK_DEFINE_STATIC (query_server_table);
27 * @brief Table for query server data.
29 static GHashTable *_qs_table = NULL;
31 static void init_queryserver (void) __attribute__((constructor));
32 static void fini_queryserver (void) __attribute__((destructor));
35 * @brief Internal function to release query server data.
38 _release_server_data (gpointer data)
40 GstTensorQueryServer *_data = (GstTensorQueryServer *) data;
45 g_mutex_lock (&_data->lock);
47 nns_edge_release_handle (_data->edge_h);
50 g_mutex_unlock (&_data->lock);
52 g_mutex_clear (&_data->lock);
53 g_cond_clear (&_data->cond);
59 * @brief Get nnstreamer edge server handle.
61 static GstTensorQueryServer *
62 gst_tensor_query_server_get_handle (const guint id)
64 GstTensorQueryServer *data;
66 G_LOCK (query_server_table);
67 data = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id));
68 G_UNLOCK (query_server_table);
74 * @brief Add nnstreamer edge server handle into hash table.
77 gst_tensor_query_server_add_data (const guint id)
79 GstTensorQueryServer *data;
82 data = gst_tensor_query_server_get_handle (id);
88 data = g_try_new0 (GstTensorQueryServer, 1);
90 nns_loge ("Failed to allocate memory for tensor query server data.");
94 g_mutex_init (&data->lock);
95 g_cond_init (&data->cond);
97 data->configured = FALSE;
99 G_LOCK (query_server_table);
100 ret = g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data);
101 G_UNLOCK (query_server_table);
107 * @brief Prepare edge connection and its handle.
110 gst_tensor_query_server_prepare (const guint id,
111 nns_edge_connect_type_e connect_type, GstTensorQueryEdgeInfo * edge_info)
113 GstTensorQueryServer *data;
114 gchar *port_str, *id_str;
115 gboolean prepared = FALSE;
118 data = gst_tensor_query_server_get_handle (id);
123 g_mutex_lock (&data->lock);
124 if (data->edge_h == NULL) {
125 id_str = g_strdup_printf ("%u", id);
127 ret = nns_edge_create_handle (id_str, connect_type,
128 NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
131 if (NNS_EDGE_ERROR_NONE != ret) {
132 GST_ERROR ("Failed to get nnstreamer edge handle.");
138 if (edge_info->host) {
139 nns_edge_set_info (data->edge_h, "HOST", edge_info->host);
141 if (edge_info->port > 0) {
142 port_str = g_strdup_printf ("%u", edge_info->port);
143 nns_edge_set_info (data->edge_h, "PORT", port_str);
146 if (edge_info->dest_host) {
147 nns_edge_set_info (data->edge_h, "DEST_HOST", edge_info->dest_host);
149 if (edge_info->dest_port > 0) {
150 port_str = g_strdup_printf ("%u", edge_info->dest_port);
151 nns_edge_set_info (data->edge_h, "DEST_PORT", port_str);
154 if (edge_info->topic) {
155 nns_edge_set_info (data->edge_h, "TOPIC", edge_info->topic);
158 nns_edge_set_event_callback (data->edge_h, edge_info->cb, edge_info->pdata);
160 ret = nns_edge_start (data->edge_h);
161 if (NNS_EDGE_ERROR_NONE != ret) {
163 ("Failed to start NNStreamer-edge. Please check server IP and port.");
171 g_mutex_unlock (&data->lock);
176 * @brief Send buffer to connected edge device.
179 gst_tensor_query_server_send_buffer (const guint id, GstBuffer * buffer)
181 GstTensorQueryServer *data;
182 GstMetaQuery *meta_query;
183 nns_edge_data_h data_h;
184 guint i, num_tensors = 0;
185 gint ret = NNS_EDGE_ERROR_NONE;
186 GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
187 GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
189 gboolean sent = FALSE;
191 data = gst_tensor_query_server_get_handle (id);
194 nns_loge ("Failed to send buffer, server handle is null.");
198 meta_query = gst_buffer_get_meta_query (buffer);
200 nns_loge ("Failed to send buffer, cannot get tensor query meta.");
204 ret = nns_edge_data_create (&data_h);
205 if (ret != NNS_EDGE_ERROR_NONE) {
206 nns_loge ("Failed to create edge data handle in query server.");
210 num_tensors = gst_tensor_buffer_get_count (buffer);
211 for (i = 0; i < num_tensors; i++) {
212 mem[i] = gst_tensor_buffer_get_nth_memory (buffer, i);
214 if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
215 ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
216 gst_memory_unref (mem[i]);
221 nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
224 val = g_strdup_printf ("%lld", (long long) meta_query->client_id);
225 nns_edge_data_set_info (data_h, "client_id", val);
228 g_mutex_lock (&data->lock);
229 ret = nns_edge_send (data->edge_h, data_h);
230 g_mutex_unlock (&data->lock);
232 if (ret != NNS_EDGE_ERROR_NONE) {
233 nns_loge ("Failed to send edge data handle in query server.");
240 for (i = 0; i < num_tensors; i++) {
241 gst_memory_unmap (mem[i], &map[i]);
242 gst_memory_unref (mem[i]);
245 nns_edge_data_destroy (data_h);
251 * @brief Release nnstreamer edge handle of query server.
254 gst_tensor_query_server_release_edge_handle (const guint id)
256 GstTensorQueryServer *data;
258 data = gst_tensor_query_server_get_handle (id);
264 g_mutex_lock (&data->lock);
266 nns_edge_release_handle (data->edge_h);
269 g_mutex_unlock (&data->lock);
273 * @brief Remove GstTensorQueryServer.
276 gst_tensor_query_server_remove_data (const guint id)
278 G_LOCK (query_server_table);
279 if (g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id)))
280 g_hash_table_remove (_qs_table, GUINT_TO_POINTER (id));
281 G_UNLOCK (query_server_table);
285 * @brief Wait until the sink is configured and get server info handle.
288 gst_tensor_query_server_wait_sink (const guint id)
291 GstTensorQueryServer *data;
293 data = gst_tensor_query_server_get_handle (id);
299 end_time = g_get_monotonic_time () +
300 DEFAULT_QUERY_INFO_TIMEOUT * G_TIME_SPAN_SECOND;
301 g_mutex_lock (&data->lock);
302 while (!data->configured) {
303 if (!g_cond_wait_until (&data->cond, &data->lock, end_time)) {
304 g_mutex_unlock (&data->lock);
305 ml_loge ("Failed to get server sink info.");
309 g_mutex_unlock (&data->lock);
315 * @brief set query server sink configured.
318 gst_tensor_query_server_set_configured (const guint id)
320 GstTensorQueryServer *data;
322 data = gst_tensor_query_server_get_handle (id);
328 g_mutex_lock (&data->lock);
329 data->configured = TRUE;
330 g_cond_broadcast (&data->cond);
331 g_mutex_unlock (&data->lock);
335 * @brief set query server caps.
338 gst_tensor_query_server_set_caps (const guint id, const gchar * caps_str)
340 GstTensorQueryServer *data;
341 gchar *prev_caps_str, *new_caps_str;
343 data = gst_tensor_query_server_get_handle (id);
349 g_mutex_lock (&data->lock);
351 prev_caps_str = new_caps_str = NULL;
352 nns_edge_get_info (data->edge_h, "CAPS", &prev_caps_str);
354 prev_caps_str = g_strdup ("");
355 new_caps_str = g_strdup_printf ("%s%s", prev_caps_str, caps_str);
356 nns_edge_set_info (data->edge_h, "CAPS", new_caps_str);
358 g_free (prev_caps_str);
359 g_free (new_caps_str);
361 g_mutex_unlock (&data->lock);
365 * @brief Initialize the query server.
368 init_queryserver (void)
370 G_LOCK (query_server_table);
371 g_assert (NULL == _qs_table); /** Internal error (duplicated init call?) */
372 _qs_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
373 _release_server_data);
374 G_UNLOCK (query_server_table);
378 * @brief Destruct the query server.
381 fini_queryserver (void)
383 G_LOCK (query_server_table);
384 g_assert (_qs_table); /** Internal error (init not called?) */
385 g_hash_table_destroy (_qs_table);
387 G_UNLOCK (query_server_table);