[Query] Add drop prop to query client
[platform/upstream/nnstreamer.git] / gst / nnstreamer / tensor_query / tensor_query_client.c
index 46bf7c5..ee66d84 100644 (file)
@@ -46,6 +46,7 @@ enum
   PROP_TOPIC,
   PROP_TIMEOUT,
   PROP_SILENT,
+  PROP_MAX_REQUEST,
 };
 
 #define TCP_HIGHEST_PORT        65535
@@ -54,6 +55,7 @@ enum
 #define TCP_DEFAULT_CLIENT_SRC_PORT 3001
 #define DEFAULT_CLIENT_TIMEOUT  0
 #define DEFAULT_SILENT TRUE
+#define DEFAULT_MAX_REQUEST 2
 
 GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_client_debug);
 #define GST_CAT_DEFAULT gst_tensor_query_client_debug
@@ -145,7 +147,13 @@ gst_tensor_query_client_class_init (GstTensorQueryClientClass * klass)
           "A timeout value (in ms) to wait message from query server after sending buffer to server. 0 means no wait.",
           0, G_MAXUINT, DEFAULT_CLIENT_TIMEOUT,
           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
+  g_object_class_install_property (gobject_class, PROP_MAX_REQUEST,
+      g_param_spec_uint ("max-request", "Maximum number of request",
+          "Sets the maximum number of buffers to request to the query server. "
+          "If the processing speed of query server is slower than the query client, the input buffer is dropped. "
+          "Two buffers are requested by default, and 0 means that all buffers are sent to query server without drop. ",
+          0, G_MAXUINT, DEFAULT_MAX_REQUEST,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
   gst_element_class_add_pad_template (gstelement_class,
       gst_static_pad_template_get (&sinktemplate));
   gst_element_class_add_pad_template (gstelement_class,
@@ -192,6 +200,8 @@ gst_tensor_query_client_init (GstTensorQueryClient * self)
   self->timeout = DEFAULT_CLIENT_TIMEOUT;
   self->edge_h = NULL;
   self->msg_queue = g_async_queue_new ();
+  self->max_request = DEFAULT_MAX_REQUEST;
+  self->requested_num = 0;
 }
 
 /**
@@ -279,6 +289,9 @@ gst_tensor_query_client_set_property (GObject * object, guint prop_id,
     case PROP_SILENT:
       self->silent = g_value_get_boolean (value);
       break;
+    case PROP_MAX_REQUEST:
+      self->max_request = g_value_get_uint (value);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -319,6 +332,9 @@ gst_tensor_query_client_get_property (GObject * object, guint prop_id,
     case PROP_SILENT:
       g_value_set_boolean (value, self->silent);
       break;
+    case PROP_MAX_REQUEST:
+      g_value_set_uint (value, self->max_request);
+      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
@@ -669,9 +685,15 @@ gst_tensor_query_client_chain (GstPad * pad,
   nns_edge_data_set_info (data_h, "client_id", val);
   g_free (val);
 
-  if (NNS_EDGE_ERROR_NONE != nns_edge_send (self->edge_h, data_h)) {
-    nns_logi ("Failed to publish to server node.");
-    goto done;
+  if (self->requested_num > self->max_request) {
+    nns_logi
+        ("the processing speed of the query server is too slow. Drop the input buffer.");
+  } else {
+    if (NNS_EDGE_ERROR_NONE != nns_edge_send (self->edge_h, data_h)) {
+      nns_logi ("Failed to publish to server node.");
+      goto done;
+    }
+    self->requested_num++;
   }
 
   nns_edge_data_destroy (data_h);
@@ -679,6 +701,7 @@ gst_tensor_query_client_chain (GstPad * pad,
   data_h = g_async_queue_timeout_pop (self->msg_queue,
       self->timeout * G_TIME_SPAN_MILLISECOND);
   if (data_h) {
+    self->requested_num--;
     ret = nns_edge_data_get_count (data_h, &num_data);
     if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
       nns_loge ("Failed to get the number of memories of the edge data.");