[Query] Add drop prop to query client
[platform/upstream/nnstreamer.git] / gst / nnstreamer / tensor_query / tensor_query_client.c
1 /* SPDX-License-Identifier: LGPL-2.1-only */
2 /**
3  * Copyright (C) 2021 Samsung Electronics Co., Ltd.
4  *
5  * @file    tensor_query_client.c
6  * @date    09 Jul 2021
7  * @brief   GStreamer plugin to handle tensor query client
8  * @author  Junhwan Kim <jejudo.kim@samsung.com>
9  * @see     http://github.com/nnstreamer/nnstreamer
10  * @bug     No known bugs
11  *
12  */
13 #ifdef HAVE_CONFIG_H
14 #include <config.h>
15 #endif
16
17 #include "nnstreamer_util.h"
18 #include "tensor_query_client.h"
19 #include <gio/gio.h>
20 #include <glib.h>
21 #include <string.h>
22 #include "tensor_query_common.h"
23
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <unistd.h>
27
28 /**
29  * @brief Macro for debug mode.
30  */
31 #ifndef DBG
32 #define DBG (!self->silent)
33 #endif
34
35 /**
36  * @brief Properties.
37  */
38 enum
39 {
40   PROP_0,
41   PROP_HOST,
42   PROP_PORT,
43   PROP_DEST_HOST,
44   PROP_DEST_PORT,
45   PROP_CONNECT_TYPE,
46   PROP_TOPIC,
47   PROP_TIMEOUT,
48   PROP_SILENT,
49   PROP_MAX_REQUEST,
50 };
51
52 #define TCP_HIGHEST_PORT        65535
53 #define TCP_DEFAULT_HOST        "localhost"
54 #define TCP_DEFAULT_SRV_SRC_PORT 3000
55 #define TCP_DEFAULT_CLIENT_SRC_PORT 3001
56 #define DEFAULT_CLIENT_TIMEOUT  0
57 #define DEFAULT_SILENT TRUE
58 #define DEFAULT_MAX_REQUEST 2
59
60 GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_client_debug);
61 #define GST_CAT_DEFAULT gst_tensor_query_client_debug
62
63 /**
64  * @brief the capabilities of the inputs.
65  */
66 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
67     GST_PAD_SINK,
68     GST_PAD_ALWAYS,
69     GST_STATIC_CAPS_ANY);
70
71 /**
72  * @brief the capabilities of the outputs.
73  */
74 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
75     GST_PAD_SRC,
76     GST_PAD_ALWAYS,
77     GST_STATIC_CAPS_ANY);
78
79 #define gst_tensor_query_client_parent_class parent_class
80 G_DEFINE_TYPE (GstTensorQueryClient, gst_tensor_query_client, GST_TYPE_ELEMENT);
81
82 static void gst_tensor_query_client_finalize (GObject * object);
83 static void gst_tensor_query_client_set_property (GObject * object,
84     guint prop_id, const GValue * value, GParamSpec * pspec);
85 static void gst_tensor_query_client_get_property (GObject * object,
86     guint prop_id, GValue * value, GParamSpec * pspec);
87
88 static gboolean gst_tensor_query_client_sink_event (GstPad * pad,
89     GstObject * parent, GstEvent * event);
90 static gboolean gst_tensor_query_client_sink_query (GstPad * pad,
91     GstObject * parent, GstQuery * query);
92 static GstFlowReturn gst_tensor_query_client_chain (GstPad * pad,
93     GstObject * parent, GstBuffer * buf);
94 static GstCaps *gst_tensor_query_client_query_caps (GstTensorQueryClient * self,
95     GstPad * pad, GstCaps * filter);
96
97 /**
98  * @brief initialize the class
99  */
100 static void
101 gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
102 {
103   GObjectClass *gobject_class;
104   GstElementClass *gstelement_class;
105
106   gobject_class = (GObjectClass *) klass;
107   gstelement_class = (GstElementClass *) klass;
108
109   gobject_class->set_property = gst_tensor_query_client_set_property;
110   gobject_class->get_property = gst_tensor_query_client_get_property;
111   gobject_class->finalize = gst_tensor_query_client_finalize;
112
113   /** install property goes here */
114   g_object_class_install_property (gobject_class, PROP_HOST,
115       g_param_spec_string ("host", "Host",
116           "A host address to receive the packets from query server",
117           TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
118   g_object_class_install_property (gobject_class, PROP_PORT,
119       g_param_spec_uint ("port", "Port",
120           "A port number to receive the packets from query server", 0,
121           TCP_HIGHEST_PORT, TCP_DEFAULT_SRV_SRC_PORT,
122           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
123   g_object_class_install_property (gobject_class, PROP_DEST_HOST,
124       g_param_spec_string ("dest-host", "Destination Host",
125           "A tenor query server host to send the packets",
126           TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
127   g_object_class_install_property (gobject_class, PROP_DEST_PORT,
128       g_param_spec_uint ("dest-port", "Destination Port",
129           "The port of tensor query server to send the packets", 0,
130           TCP_HIGHEST_PORT, TCP_DEFAULT_CLIENT_SRC_PORT,
131           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
132   g_object_class_install_property (gobject_class, PROP_SILENT,
133       g_param_spec_boolean ("silent", "Silent", "Produce verbose output",
134           DEFAULT_SILENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
135   g_object_class_install_property (gobject_class, PROP_CONNECT_TYPE,
136       g_param_spec_enum ("connect-type", "Connect Type",
137           "The connections type between client and server.",
138           GST_TYPE_QUERY_CONNECT_TYPE, DEFAULT_CONNECT_TYPE,
139           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140   g_object_class_install_property (gobject_class, PROP_TOPIC,
141       g_param_spec_string ("topic", "Topic",
142           "The main topic of the host.",
143           "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
144
145   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
146       g_param_spec_uint ("timeout", "timeout value",
147           "A timeout value (in ms) to wait message from query server after sending buffer to server. 0 means no wait.",
148           0, G_MAXUINT, DEFAULT_CLIENT_TIMEOUT,
149           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150   g_object_class_install_property (gobject_class, PROP_MAX_REQUEST,
151       g_param_spec_uint ("max-request", "Maximum number of request",
152           "Sets the maximum number of buffers to request to the query server. "
153           "If the processing speed of query server is slower than the query client, the input buffer is dropped. "
154           "Two buffers are requested by default, and 0 means that all buffers are sent to query server without drop. ",
155           0, G_MAXUINT, DEFAULT_MAX_REQUEST,
156           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
157   gst_element_class_add_pad_template (gstelement_class,
158       gst_static_pad_template_get (&sinktemplate));
159   gst_element_class_add_pad_template (gstelement_class,
160       gst_static_pad_template_get (&srctemplate));
161
162   gst_element_class_set_static_metadata (gstelement_class,
163       "TensorQueryClient", "Filter/Tensor/Query",
164       "Handle querying tensor data through the network",
165       "Samsung Electronics Co., Ltd.");
166
167   GST_DEBUG_CATEGORY_INIT (gst_tensor_query_client_debug, "tensor_query_client",
168       0, "Tensor Query Client");
169 }
170
171 /**
172  * @brief initialize the new element
173  */
174 static void
175 gst_tensor_query_client_init (GstTensorQueryClient * self)
176 {
177   /** setup sink pad */
178   self->sinkpad = gst_pad_new_from_static_template (&sinktemplate, "sink");
179   gst_element_add_pad (GST_ELEMENT (self), self->sinkpad);
180   gst_pad_set_event_function (self->sinkpad,
181       GST_DEBUG_FUNCPTR (gst_tensor_query_client_sink_event));
182   gst_pad_set_query_function (self->sinkpad,
183       GST_DEBUG_FUNCPTR (gst_tensor_query_client_sink_query));
184   gst_pad_set_chain_function (self->sinkpad,
185       GST_DEBUG_FUNCPTR (gst_tensor_query_client_chain));
186
187   /** setup src pad */
188   self->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
189   gst_element_add_pad (GST_ELEMENT (self), self->srcpad);
190
191   /* init properties */
192   self->silent = DEFAULT_SILENT;
193   self->connect_type = DEFAULT_CONNECT_TYPE;
194   self->host = g_strdup (TCP_DEFAULT_HOST);
195   self->port = TCP_DEFAULT_CLIENT_SRC_PORT;
196   self->dest_host = g_strdup (TCP_DEFAULT_HOST);
197   self->dest_port = TCP_DEFAULT_SRV_SRC_PORT;
198   self->topic = NULL;
199   self->in_caps_str = NULL;
200   self->timeout = DEFAULT_CLIENT_TIMEOUT;
201   self->edge_h = NULL;
202   self->msg_queue = g_async_queue_new ();
203   self->max_request = DEFAULT_MAX_REQUEST;
204   self->requested_num = 0;
205 }
206
207 /**
208  * @brief finalize the object
209  */
210 static void
211 gst_tensor_query_client_finalize (GObject * object)
212 {
213   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
214   nns_edge_data_h data_h;
215
216   g_free (self->host);
217   self->host = NULL;
218   g_free (self->dest_host);
219   self->dest_host = NULL;
220   g_free (self->topic);
221   self->topic = NULL;
222   g_free (self->in_caps_str);
223   self->in_caps_str = NULL;
224
225   while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
226     nns_edge_data_destroy (data_h);
227   }
228
229   if (self->msg_queue) {
230     g_async_queue_unref (self->msg_queue);
231     self->msg_queue = NULL;
232   }
233
234   if (self->edge_h) {
235     nns_edge_release_handle (self->edge_h);
236     self->edge_h = NULL;
237   }
238
239   G_OBJECT_CLASS (parent_class)->finalize (object);
240 }
241
242 /**
243  * @brief set property
244  */
245 static void
246 gst_tensor_query_client_set_property (GObject * object, guint prop_id,
247     const GValue * value, GParamSpec * pspec)
248 {
249   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
250
251   /** @todo DO NOT update properties (host, port, ..) while pipeline is running. */
252   switch (prop_id) {
253     case PROP_HOST:
254       if (!g_value_get_string (value)) {
255         nns_logw ("Sink host property cannot be NULL");
256         break;
257       }
258       g_free (self->host);
259       self->host = g_value_dup_string (value);
260       break;
261     case PROP_PORT:
262       self->port = g_value_get_uint (value);
263       break;
264     case PROP_DEST_HOST:
265       if (!g_value_get_string (value)) {
266         nns_logw ("Sink host property cannot be NULL");
267         break;
268       }
269       g_free (self->dest_host);
270       self->dest_host = g_value_dup_string (value);
271       break;
272     case PROP_DEST_PORT:
273       self->dest_port = g_value_get_uint (value);
274       break;
275     case PROP_CONNECT_TYPE:
276       self->connect_type = g_value_get_enum (value);
277       break;
278     case PROP_TOPIC:
279       if (!g_value_get_string (value)) {
280         nns_logw ("Topic property cannot be NULL. Query-hybrid is disabled.");
281         break;
282       }
283       g_free (self->topic);
284       self->topic = g_value_dup_string (value);
285       break;
286     case PROP_TIMEOUT:
287       self->timeout = g_value_get_uint (value);
288       break;
289     case PROP_SILENT:
290       self->silent = g_value_get_boolean (value);
291       break;
292     case PROP_MAX_REQUEST:
293       self->max_request = g_value_get_uint (value);
294       break;
295     default:
296       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
297       break;
298   }
299 }
300
301 /**
302  * @brief get property
303  */
304 static void
305 gst_tensor_query_client_get_property (GObject * object, guint prop_id,
306     GValue * value, GParamSpec * pspec)
307 {
308   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (object);
309
310   switch (prop_id) {
311     case PROP_HOST:
312       g_value_set_string (value, self->host);
313       break;
314     case PROP_PORT:
315       g_value_set_uint (value, self->port);
316       break;
317     case PROP_DEST_HOST:
318       g_value_set_string (value, self->dest_host);
319       break;
320     case PROP_DEST_PORT:
321       g_value_set_uint (value, self->dest_port);
322       break;
323     case PROP_CONNECT_TYPE:
324       g_value_set_enum (value, self->connect_type);
325       break;
326     case PROP_TOPIC:
327       g_value_set_string (value, self->topic);
328       break;
329     case PROP_TIMEOUT:
330       g_value_set_uint (value, self->timeout);
331       break;
332     case PROP_SILENT:
333       g_value_set_boolean (value, self->silent);
334       break;
335     case PROP_MAX_REQUEST:
336       g_value_set_uint (value, self->max_request);
337       break;
338     default:
339       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
340       break;
341   }
342 }
343
344 /**
345  * @brief Update src pad caps from tensors config.
346  */
347 static gboolean
348 gst_tensor_query_client_update_caps (GstTensorQueryClient * self,
349     const gchar * caps_str)
350 {
351   GstCaps *curr_caps, *out_caps;
352   gboolean ret = FALSE;
353   out_caps = gst_caps_from_string (caps_str);
354   silent_debug_caps (self, out_caps, "set out-caps");
355
356   /* Update src pad caps if it is different. */
357   curr_caps = gst_pad_get_current_caps (self->srcpad);
358   if (curr_caps == NULL || !gst_caps_is_equal (curr_caps, out_caps)) {
359     if (gst_caps_is_fixed (out_caps)) {
360       ret = gst_pad_set_caps (self->srcpad, out_caps);
361     } else {
362       nns_loge ("out-caps from tensor_query_serversink is not fixed. "
363           "Failed to update client src caps, out-caps: %s", caps_str);
364     }
365   } else {
366     /** Don't need to update when the capability is the same. */
367     ret = TRUE;
368   }
369
370   if (curr_caps)
371     gst_caps_unref (curr_caps);
372
373   gst_caps_unref (out_caps);
374
375   return ret;
376 }
377
378 /**
379  * @brief Parse caps from received event data.
380  */
381 static gchar *
382 _nns_edge_parse_caps (gchar * caps_str, gboolean is_src)
383 {
384   gchar **strv;
385   gint num, i;
386   gchar *find_key = NULL;
387   gchar *ret_str = NULL;
388
389   if (!caps_str)
390     return NULL;
391
392   strv = g_strsplit (caps_str, "@", -1);
393   num = g_strv_length (strv);
394
395   find_key =
396       is_src ==
397       TRUE ? g_strdup ("query_server_src_caps") :
398       g_strdup ("query_server_sink_caps");
399
400   for (i = 1; i < num; i += 2) {
401     if (0 == g_strcmp0 (find_key, strv[i])) {
402       ret_str = g_strdup (strv[i + 1]);
403       break;
404     }
405   }
406
407   g_free (find_key);
408   g_strfreev (strv);
409
410   return ret_str;
411 }
412
413 /**
414  * @brief nnstreamer-edge event callback.
415  */
416 static int
417 _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
418 {
419   nns_edge_event_e event_type;
420   int ret = NNS_EDGE_ERROR_NONE;
421   GstTensorQueryClient *self = (GstTensorQueryClient *) user_data;
422
423   if (NNS_EDGE_ERROR_NONE != nns_edge_event_get_type (event_h, &event_type)) {
424     nns_loge ("Failed to get event type!");
425     return NNS_EDGE_ERROR_NOT_SUPPORTED;
426   }
427
428   switch (event_type) {
429     case NNS_EDGE_EVENT_CAPABILITY:
430     {
431       GstCaps *server_caps, *client_caps;
432       GstStructure *server_st, *client_st;
433       gboolean result = FALSE;
434       gchar *ret_str, *caps_str;
435
436       nns_edge_event_parse_capability (event_h, &caps_str);
437       ret_str = _nns_edge_parse_caps (caps_str, TRUE);
438       nns_logd ("Received server-src caps: %s", GST_STR_NULL (ret_str));
439       client_caps = gst_caps_from_string ((gchar *) self->in_caps_str);
440       server_caps = gst_caps_from_string (ret_str);
441       g_free (ret_str);
442
443       /** Server framerate may vary. Let's skip comparing the framerate. */
444       gst_caps_set_simple (server_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
445           NULL);
446       gst_caps_set_simple (client_caps, "framerate", GST_TYPE_FRACTION, 0, 1,
447           NULL);
448
449       server_st = gst_caps_get_structure (server_caps, 0);
450       client_st = gst_caps_get_structure (client_caps, 0);
451
452       if (gst_structure_is_tensor_stream (server_st)) {
453         GstTensorsConfig server_config, client_config;
454
455         gst_tensors_config_from_structure (&server_config, server_st);
456         gst_tensors_config_from_structure (&client_config, client_st);
457
458         result = gst_tensors_config_is_equal (&server_config, &client_config);
459       }
460
461       if (result || gst_caps_can_intersect (client_caps, server_caps)) {
462         /** Update client src caps */
463         ret_str = _nns_edge_parse_caps (caps_str, FALSE);
464         nns_logd ("Received server-sink caps: %s", GST_STR_NULL (ret_str));
465         if (!gst_tensor_query_client_update_caps (self, ret_str)) {
466           nns_loge ("Failed to update client source caps.");
467           ret = NNS_EDGE_ERROR_UNKNOWN;
468         }
469         g_free (ret_str);
470       } else {
471         /* respond deny with src caps string */
472         nns_loge ("Query caps is not acceptable!");
473         ret = NNS_EDGE_ERROR_UNKNOWN;
474       }
475
476       gst_caps_unref (server_caps);
477       gst_caps_unref (client_caps);
478       g_free (caps_str);
479       break;
480     }
481     case NNS_EDGE_EVENT_NEW_DATA_RECEIVED:
482     {
483       nns_edge_data_h data;
484
485       nns_edge_event_parse_new_data (event_h, &data);
486       g_async_queue_push (self->msg_queue, data);
487       break;
488     }
489     default:
490       break;
491   }
492
493   return ret;
494 }
495
496 /**
497  * @brief Internal function to create edge handle.
498  */
499 static gboolean
500 gst_tensor_query_client_create_edge_handle (GstTensorQueryClient * self)
501 {
502   gboolean started = FALSE;
503   gchar *prev_caps = NULL;
504   int ret;
505
506   /* Already created, compare caps string. */
507   if (self->edge_h) {
508     ret = nns_edge_get_info (self->edge_h, "CAPS", &prev_caps);
509
510     if (ret != NNS_EDGE_ERROR_NONE || !prev_caps ||
511         !g_str_equal (prev_caps, self->in_caps_str)) {
512       /* Capability is changed, close old handle. */
513       nns_edge_release_handle (self->edge_h);
514       self->edge_h = NULL;
515     } else {
516       return TRUE;
517     }
518   }
519
520   ret = nns_edge_create_handle ("TEMP_ID", self->connect_type,
521       NNS_EDGE_NODE_TYPE_QUERY_CLIENT, &self->edge_h);
522   if (ret != NNS_EDGE_ERROR_NONE)
523     return FALSE;
524
525   nns_edge_set_event_callback (self->edge_h, _nns_edge_event_cb, self);
526
527   if (self->topic)
528     nns_edge_set_info (self->edge_h, "TOPIC", self->topic);
529   if (self->host)
530     nns_edge_set_info (self->edge_h, "HOST", self->host);
531   if (self->port > 0) {
532     gchar *port = g_strdup_printf ("%u", self->port);
533     nns_edge_set_info (self->edge_h, "PORT", port);
534     g_free (port);
535   }
536   nns_edge_set_info (self->edge_h, "CAPS", self->in_caps_str);
537
538   ret = nns_edge_start (self->edge_h);
539   if (ret != NNS_EDGE_ERROR_NONE) {
540     nns_loge
541         ("Failed to start NNStreamer-edge. Please check server IP and port.");
542     goto done;
543   }
544
545   ret = nns_edge_connect (self->edge_h, self->dest_host, self->dest_port);
546   if (ret != NNS_EDGE_ERROR_NONE) {
547     nns_loge ("Failed to connect to edge server!");
548     goto done;
549   }
550
551   started = TRUE;
552
553 done:
554   if (!started) {
555     nns_edge_release_handle (self->edge_h);
556     self->edge_h = NULL;
557   }
558
559   return started;
560 }
561
562 /**
563  * @brief This function handles sink event.
564  */
565 static gboolean
566 gst_tensor_query_client_sink_event (GstPad * pad,
567     GstObject * parent, GstEvent * event)
568 {
569   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
570
571   GST_DEBUG_OBJECT (self, "Received %s event: %" GST_PTR_FORMAT,
572       GST_EVENT_TYPE_NAME (event), event);
573
574   switch (GST_EVENT_TYPE (event)) {
575     case GST_EVENT_CAPS:
576     {
577       GstCaps *caps;
578       gboolean ret;
579
580       gst_event_parse_caps (event, &caps);
581       g_free (self->in_caps_str);
582       self->in_caps_str = gst_caps_to_string (caps);
583
584       ret = gst_tensor_query_client_create_edge_handle (self);
585       if (!ret)
586         nns_loge ("Failed to create edge handle, cannot start query client.");
587
588       gst_event_unref (event);
589       return ret;
590     }
591     default:
592       break;
593   }
594
595   return gst_pad_event_default (pad, parent, event);
596 }
597
598 /**
599  * @brief This function handles sink pad query.
600  */
601 static gboolean
602 gst_tensor_query_client_sink_query (GstPad * pad,
603     GstObject * parent, GstQuery * query)
604 {
605   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
606
607   GST_DEBUG_OBJECT (self, "Received %s query: %" GST_PTR_FORMAT,
608       GST_QUERY_TYPE_NAME (query), query);
609
610   switch (GST_QUERY_TYPE (query)) {
611     case GST_QUERY_CAPS:
612     {
613       GstCaps *caps;
614       GstCaps *filter;
615
616       gst_query_parse_caps (query, &filter);
617       caps = gst_tensor_query_client_query_caps (self, pad, filter);
618
619       gst_query_set_caps_result (query, caps);
620       gst_caps_unref (caps);
621       return TRUE;
622     }
623     case GST_QUERY_ACCEPT_CAPS:
624     {
625       GstCaps *caps;
626       GstCaps *template_caps;
627       gboolean res = FALSE;
628
629       gst_query_parse_accept_caps (query, &caps);
630       silent_debug_caps (self, caps, "accept-caps");
631
632       if (gst_caps_is_fixed (caps)) {
633         template_caps = gst_pad_get_pad_template_caps (pad);
634
635         res = gst_caps_can_intersect (template_caps, caps);
636         gst_caps_unref (template_caps);
637       }
638
639       gst_query_set_accept_caps_result (query, res);
640       return TRUE;
641     }
642     default:
643       break;
644   }
645
646   return gst_pad_query_default (pad, parent, query);
647 }
648
649 /**
650  * @brief Chain function, this function does the actual processing.
651  */
652 static GstFlowReturn
653 gst_tensor_query_client_chain (GstPad * pad,
654     GstObject * parent, GstBuffer * buf)
655 {
656   GstTensorQueryClient *self = GST_TENSOR_QUERY_CLIENT (parent);
657   GstBuffer *out_buf = NULL;
658   GstFlowReturn res = GST_FLOW_OK;
659   nns_edge_data_h data_h;
660   guint i, num_mems, num_data;
661   int ret;
662   GstMemory *mem[NNS_TENSOR_SIZE_LIMIT];
663   GstMapInfo map[NNS_TENSOR_SIZE_LIMIT];
664   gchar *val;
665   UNUSED (pad);
666
667   ret = nns_edge_data_create (&data_h);
668   if (ret != NNS_EDGE_ERROR_NONE) {
669     nns_loge ("Failed to create data handle in client chain.");
670     return GST_FLOW_ERROR;
671   }
672
673   num_mems = gst_buffer_n_memory (buf);
674   for (i = 0; i < num_mems; i++) {
675     mem[i] = gst_buffer_peek_memory (buf, i);
676     if (!gst_memory_map (mem[i], &map[i], GST_MAP_READ)) {
677       ml_loge ("Cannot map the %uth memory in gst-buffer.", i);
678       num_mems = i;
679       goto done;
680     }
681     nns_edge_data_add (data_h, map[i].data, map[i].size, NULL);
682   }
683
684   nns_edge_get_info (self->edge_h, "client_id", &val);
685   nns_edge_data_set_info (data_h, "client_id", val);
686   g_free (val);
687
688   if (self->requested_num > self->max_request) {
689     nns_logi
690         ("the processing speed of the query server is too slow. Drop the input buffer.");
691   } else {
692     if (NNS_EDGE_ERROR_NONE != nns_edge_send (self->edge_h, data_h)) {
693       nns_logi ("Failed to publish to server node.");
694       goto done;
695     }
696     self->requested_num++;
697   }
698
699   nns_edge_data_destroy (data_h);
700
701   data_h = g_async_queue_timeout_pop (self->msg_queue,
702       self->timeout * G_TIME_SPAN_MILLISECOND);
703   if (data_h) {
704     self->requested_num--;
705     ret = nns_edge_data_get_count (data_h, &num_data);
706     if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
707       nns_loge ("Failed to get the number of memories of the edge data.");
708       res = GST_FLOW_ERROR;
709       goto done;
710     }
711
712     out_buf = gst_buffer_new ();
713     for (i = 0; i < num_data; i++) {
714       void *data = NULL;
715       nns_size_t data_len;
716       gpointer new_data;
717
718       nns_edge_data_get (data_h, i, &data, &data_len);
719       new_data = _g_memdup (data, data_len);
720       gst_buffer_append_memory (out_buf,
721           gst_memory_new_wrapped (0, new_data, data_len, 0,
722               data_len, new_data, g_free));
723     }
724     /* metadata from incoming buffer */
725     gst_buffer_copy_into (out_buf, buf, GST_BUFFER_COPY_METADATA, 0, -1);
726
727     res = gst_pad_push (self->srcpad, out_buf);
728   }
729
730 done:
731   if (data_h) {
732     nns_edge_data_destroy (data_h);
733   }
734
735   for (i = 0; i < num_mems; i++)
736     gst_memory_unmap (mem[i], &map[i]);
737
738   gst_buffer_unref (buf);
739   return res;
740 }
741
742 /**
743  * @brief Get pad caps for caps negotiation.
744  */
745 static GstCaps *
746 gst_tensor_query_client_query_caps (GstTensorQueryClient * self, GstPad * pad,
747     GstCaps * filter)
748 {
749   GstCaps *caps;
750
751   caps = gst_pad_get_current_caps (pad);
752   if (!caps) {
753     /** pad don't have current caps. use the template caps */
754     caps = gst_pad_get_pad_template_caps (pad);
755   }
756
757   silent_debug_caps (self, caps, "caps");
758   silent_debug_caps (self, filter, "filter");
759
760   if (filter) {
761     GstCaps *intersection;
762     intersection =
763         gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
764
765     gst_caps_unref (caps);
766     caps = intersection;
767   }
768
769   silent_debug_caps (self, caps, "result");
770   return caps;
771 }