[CodeClean] handle hashtable function
[platform/upstream/nnstreamer.git] / gst / nnstreamer / tensor_query / tensor_query_server.c
1 /* SPDX-License-Identifier: LGPL-2.1-only */
2 /**
3  * Copyright (C) 2021 Samsung Electronics Co., Ltd.
4  *
5  * @file    tensor_query_server.c
6  * @date    03 Aug 2021
7  * @brief   GStreamer plugin to handle meta_query for server elements
8  * @author  Junhwan Kim <jejudo.kim@samsung.com>
9  * @see     http://github.com/nnstreamer/nnstreamer
10  * @bug     No known bugs
11  *
12  */
13 #ifdef HAVE_CONFIG_H
14 #include "config.h"
15 #endif
16
17 #include "tensor_query_server.h"
18 #include <tensor_typedef.h>
19 #include <tensor_common.h>
20
21 /**
22  * @brief mutex for tensor-query server table.
23  */
24 G_LOCK_DEFINE_STATIC (query_server_table);
25
26 /**
27  * @brief Table for query server data.
28  */
29 static GHashTable *_qs_table = NULL;
30
31 static void init_queryserver (void) __attribute__((constructor));
32 static void fini_queryserver (void) __attribute__((destructor));
33
34 /**
35  * @brief Internal function to release query server data.
36  */
37 static void
38 _release_server_data (gpointer data)
39 {
40   GstTensorQueryServer *_data = (GstTensorQueryServer *) data;
41
42   if (!_data)
43     return;
44
45   g_mutex_lock (&_data->lock);
46   if (_data->edge_h) {
47     nns_edge_release_handle (_data->edge_h);
48     _data->edge_h = NULL;
49   }
50   g_mutex_unlock (&_data->lock);
51
52   g_mutex_clear (&_data->lock);
53   g_cond_clear (&_data->cond);
54
55   g_free (_data);
56 }
57
58 /**
59  * @brief Get nnstreamer edge server handle.
60  */
61 static GstTensorQueryServer *
62 gst_tensor_query_server_get_handle (const guint id)
63 {
64   GstTensorQueryServer *data;
65
66   G_LOCK (query_server_table);
67   data = g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id));
68   G_UNLOCK (query_server_table);
69
70   return data;
71 }
72
73 /**
74  * @brief Add nnstreamer edge server handle into hash table.
75  */
76 gboolean
77 gst_tensor_query_server_add_data (const guint id)
78 {
79   GstTensorQueryServer *data;
80   gboolean ret;
81
82   data = gst_tensor_query_server_get_handle (id);
83
84   if (NULL != data) {
85     return TRUE;
86   }
87
88   data = g_try_new0 (GstTensorQueryServer, 1);
89   if (NULL == data) {
90     nns_loge ("Failed to allocate memory for tensor query server data.");
91     return FALSE;
92   }
93
94   g_mutex_init (&data->lock);
95   g_cond_init (&data->cond);
96   data->id = id;
97   data->configured = FALSE;
98
99   G_LOCK (query_server_table);
100   ret = g_hash_table_insert (_qs_table, GUINT_TO_POINTER (id), data);
101   G_UNLOCK (query_server_table);
102
103   return ret;
104 }
105
106 /**
107  * @brief Prepare edge connection and its handle.
108  */
109 gboolean
110 gst_tensor_query_server_prepare (const guint id,
111     nns_edge_connect_type_e connect_type, GstTensorQueryEdgeInfo * edge_info)
112 {
113   GstTensorQueryServer *data;
114   gchar *port_str, *id_str;
115   gboolean prepared = FALSE;
116   gint ret;
117
118   data = gst_tensor_query_server_get_handle (id);
119   if (NULL == data) {
120     return FALSE;
121   }
122
123   g_mutex_lock (&data->lock);
124   if (data->edge_h == NULL) {
125     id_str = g_strdup_printf ("%u", id);
126
127     ret = nns_edge_create_handle (id_str, connect_type,
128         NNS_EDGE_NODE_TYPE_QUERY_SERVER, &data->edge_h);
129     g_free (id_str);
130
131     if (NNS_EDGE_ERROR_NONE != ret) {
132       GST_ERROR ("Failed to get nnstreamer edge handle.");
133       goto done;
134     }
135   }
136
137   if (edge_info) {
138     if (edge_info->host) {
139       nns_edge_set_info (data->edge_h, "HOST", edge_info->host);
140     }
141     if (edge_info->port > 0) {
142       port_str = g_strdup_printf ("%u", edge_info->port);
143       nns_edge_set_info (data->edge_h, "PORT", port_str);
144       g_free (port_str);
145     }
146     if (edge_info->dest_host) {
147       nns_edge_set_info (data->edge_h, "DEST_HOST", edge_info->dest_host);
148     }
149     if (edge_info->dest_port > 0) {
150       port_str = g_strdup_printf ("%u", edge_info->dest_port);
151       nns_edge_set_info (data->edge_h, "DEST_PORT", port_str);
152       g_free (port_str);
153     }
154     if (edge_info->topic) {
155       nns_edge_set_info (data->edge_h, "TOPIC", edge_info->topic);
156     }
157
158     nns_edge_set_event_callback (data->edge_h, edge_info->cb, edge_info->pdata);
159
160     ret = nns_edge_start (data->edge_h);
161     if (NNS_EDGE_ERROR_NONE != ret) {
162       nns_loge
163           ("Failed to start NNStreamer-edge. Please check server IP and port.");
164       goto done;
165     }
166   }
167
168   prepared = TRUE;
169
170 done:
171   g_mutex_unlock (&data->lock);
172   return prepared;
173 }
174
175 /**
176  * @brief Send buffer to connected edge device.
177  */
178 gboolean
179 gst_tensor_query_server_send_buffer (const guint id, GstBuffer * buffer)
180 {
181   GstTensorQueryServer *data;
182   GstMetaQuery *meta_query;
183   nns_edge_data_h data_h;
184   guint i, num_tensors = 0;
185   gint ret = NNS_EDGE_ERROR_NONE;
186   GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
187   GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
188   gchar *val;
189   gboolean sent = FALSE;
190
191   data = gst_tensor_query_server_get_handle (id);
192
193   if (NULL == data) {
194     nns_loge ("Failed to send buffer, server handle is null.");
195     return FALSE;
196   }
197
198   meta_query = gst_buffer_get_meta_query (buffer);
199   if (!meta_query) {
200     nns_loge ("Failed to send buffer, cannot get tensor query meta.");
201     return FALSE;
202   }
203
204   ret = nns_edge_data_create (&data_h);
205   if (ret != NNS_EDGE_ERROR_NONE) {
206     nns_loge ("Failed to create edge data handle in query server.");
207     return FALSE;
208   }
209
210   num_tensors = gst_tensor_buffer_get_count (buffer);
211   for (i = 0; i < num_tensors; i++) {
212     mem[i] = gst_tensor_buffer_get_nth_memory (buffer, i);
213
214     if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
215       ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
216       gst_memory_unref (mem[i]);
217       num_tensors = i;
218       goto done;
219     }
220
221     nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
222   }
223
224   val = g_strdup_printf ("%lld", (long long) meta_query->client_id);
225   nns_edge_data_set_info (data_h, "client_id", val);
226   g_free (val);
227
228   g_mutex_lock (&data->lock);
229   ret = nns_edge_send (data->edge_h, data_h);
230   g_mutex_unlock (&data->lock);
231
232   if (ret != NNS_EDGE_ERROR_NONE) {
233     nns_loge ("Failed to send edge data handle in query server.");
234     goto done;
235   }
236
237   sent = TRUE;
238
239 done:
240   for (i = 0; i < num_tensors; i++) {
241     gst_memory_unmap (mem[i], &map[i]);
242     gst_memory_unref (mem[i]);
243   }
244
245   nns_edge_data_destroy (data_h);
246
247   return sent;
248 }
249
250 /**
251  * @brief Release nnstreamer edge handle of query server.
252  */
253 void
254 gst_tensor_query_server_release_edge_handle (const guint id)
255 {
256   GstTensorQueryServer *data;
257
258   data = gst_tensor_query_server_get_handle (id);
259
260   if (NULL == data) {
261     return;
262   }
263
264   g_mutex_lock (&data->lock);
265   if (data->edge_h) {
266     nns_edge_release_handle (data->edge_h);
267     data->edge_h = NULL;
268   }
269   g_mutex_unlock (&data->lock);
270 }
271
272 /**
273  * @brief Remove GstTensorQueryServer.
274  */
275 void
276 gst_tensor_query_server_remove_data (const guint id)
277 {
278   G_LOCK (query_server_table);
279   if (g_hash_table_lookup (_qs_table, GUINT_TO_POINTER (id)))
280     g_hash_table_remove (_qs_table, GUINT_TO_POINTER (id));
281   G_UNLOCK (query_server_table);
282 }
283
284 /**
285  * @brief Wait until the sink is configured and get server info handle.
286  */
287 gboolean
288 gst_tensor_query_server_wait_sink (const guint id)
289 {
290   gint64 end_time;
291   GstTensorQueryServer *data;
292
293   data = gst_tensor_query_server_get_handle (id);
294
295   if (NULL == data) {
296     return FALSE;
297   }
298
299   end_time = g_get_monotonic_time () +
300       DEFAULT_QUERY_INFO_TIMEOUT * G_TIME_SPAN_SECOND;
301   g_mutex_lock (&data->lock);
302   while (!data->configured) {
303     if (!g_cond_wait_until (&data->cond, &data->lock, end_time)) {
304       g_mutex_unlock (&data->lock);
305       ml_loge ("Failed to get server sink info.");
306       return FALSE;
307     }
308   }
309   g_mutex_unlock (&data->lock);
310
311   return TRUE;
312 }
313
314 /**
315  * @brief set query server sink configured.
316  */
317 void
318 gst_tensor_query_server_set_configured (const guint id)
319 {
320   GstTensorQueryServer *data;
321
322   data = gst_tensor_query_server_get_handle (id);
323
324   if (NULL == data) {
325     return;
326   }
327
328   g_mutex_lock (&data->lock);
329   data->configured = TRUE;
330   g_cond_broadcast (&data->cond);
331   g_mutex_unlock (&data->lock);
332 }
333
334 /**
335  * @brief set query server caps.
336  */
337 void
338 gst_tensor_query_server_set_caps (const guint id, const gchar * caps_str)
339 {
340   GstTensorQueryServer *data;
341   gchar *prev_caps_str, *new_caps_str;
342
343   data = gst_tensor_query_server_get_handle (id);
344
345   if (NULL == data) {
346     return;
347   }
348
349   g_mutex_lock (&data->lock);
350
351   prev_caps_str = new_caps_str = NULL;
352   nns_edge_get_info (data->edge_h, "CAPS", &prev_caps_str);
353   if (!prev_caps_str)
354     prev_caps_str = g_strdup ("");
355   new_caps_str = g_strdup_printf ("%s%s", prev_caps_str, caps_str);
356   nns_edge_set_info (data->edge_h, "CAPS", new_caps_str);
357
358   g_free (prev_caps_str);
359   g_free (new_caps_str);
360
361   g_mutex_unlock (&data->lock);
362 }
363
364 /**
365  * @brief Initialize the query server.
366  */
367 static void
368 init_queryserver (void)
369 {
370   G_LOCK (query_server_table);
371   g_assert (NULL == _qs_table); /** Internal error (duplicated init call?) */
372   _qs_table = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
373       _release_server_data);
374   G_UNLOCK (query_server_table);
375 }
376
377 /**
378  * @brief Destruct the query server.
379  */
380 static void
381 fini_queryserver (void)
382 {
383   G_LOCK (query_server_table);
384   g_assert (_qs_table); /** Internal error (init not called?) */
385   g_hash_table_destroy (_qs_table);
386   _qs_table = NULL;
387   G_UNLOCK (query_server_table);
388 }