PROP_TOPIC,
PROP_TIMEOUT,
PROP_SILENT,
+ PROP_MAX_REQUEST,
};
#define TCP_HIGHEST_PORT 65535
#define TCP_DEFAULT_CLIENT_SRC_PORT 3001
#define DEFAULT_CLIENT_TIMEOUT 0
#define DEFAULT_SILENT TRUE
+#define DEFAULT_MAX_REQUEST 2
GST_DEBUG_CATEGORY_STATIC (gst_tensor_query_client_debug);
#define GST_CAT_DEFAULT gst_tensor_query_client_debug
"A timeout value (in ms) to wait message from query server after sending buffer to server. 0 means no wait.",
0, G_MAXUINT, DEFAULT_CLIENT_TIMEOUT,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
-
+ g_object_class_install_property (gobject_class, PROP_MAX_REQUEST,
+ g_param_spec_uint ("max-request", "Maximum number of request",
+ "Sets the maximum number of buffers to request to the query server. "
+ "If the processing speed of query server is slower than the query client, the input buffer is dropped. "
+ "Two buffers are requested by default, and 0 means that all buffers are sent to query server without drop. ",
+ 0, G_MAXUINT, DEFAULT_MAX_REQUEST,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
gst_element_class_add_pad_template (gstelement_class,
self->timeout = DEFAULT_CLIENT_TIMEOUT;
self->edge_h = NULL;
self->msg_queue = g_async_queue_new ();
+ self->max_request = DEFAULT_MAX_REQUEST;
+ self->requested_num = 0;
}
/**
case PROP_SILENT:
self->silent = g_value_get_boolean (value);
break;
+ case PROP_MAX_REQUEST:
+ self->max_request = g_value_get_uint (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
case PROP_SILENT:
g_value_set_boolean (value, self->silent);
break;
+ case PROP_MAX_REQUEST:
+ g_value_set_uint (value, self->max_request);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
nns_edge_data_set_info (data_h, "client_id", val);
g_free (val);
- if (NNS_EDGE_ERROR_NONE != nns_edge_send (self->edge_h, data_h)) {
- nns_logi ("Failed to publish to server node.");
- goto done;
+ if (self->requested_num > self->max_request) {
+ nns_logi
+ ("the processing speed of the query server is too slow. Drop the input buffer.");
+ } else {
+ if (NNS_EDGE_ERROR_NONE != nns_edge_send (self->edge_h, data_h)) {
+ nns_logi ("Failed to publish to server node.");
+ goto done;
+ }
+ self->requested_num++;
}
nns_edge_data_destroy (data_h);
data_h = g_async_queue_timeout_pop (self->msg_queue,
self->timeout * G_TIME_SPAN_MILLISECOND);
if (data_h) {
+ self->requested_num--;
ret = nns_edge_data_get_count (data_h, &num_data);
if (ret != NNS_EDGE_ERROR_NONE || num_data == 0) {
nns_loge ("Failed to get the number of memories of the edge data.");