From: Dongju Chae Date: Wed, 4 Nov 2020 10:36:43 +0000 (+0900) Subject: [gRPC/Sink] Implement gRPC tensor sink X-Git-Tag: accepted/tizen/unified/20201119.125120~10 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=0868e6d4bf32331ff22f2d57728c31bc12eec714;p=platform%2Fupstream%2Fnnstreamer.git [gRPC/Sink] Implement gRPC tensor sink This patch implements gRPC tensor sink as a client. Its gRPC server mode is WIP. Signed-off-by: Dongju Chae --- diff --git a/ext/nnstreamer/meson.build b/ext/nnstreamer/meson.build index 936d94e..c1c7ebf 100644 --- a/ext/nnstreamer/meson.build +++ b/ext/nnstreamer/meson.build @@ -45,6 +45,7 @@ subdir('tensor_decoder') subdir('tensor_filter') subdir('tensor_source') subdir('tensor_converter') +subdir('tensor_sink') if get_option('enable-tizen-sensor') tizensensor_registerer_source_files = ['registerer/tizensensor.c'] @@ -79,7 +80,7 @@ if grpc_support_is_available grpc_lib = shared_library('nnstreamer-grpc', grpc_registerer_sources, - dependencies: [tensor_src_grpc_dep], + dependencies: [tensor_src_grpc_dep, tensor_sink_grpc_dep], install: true, install_dir: plugins_install_dir ) diff --git a/ext/nnstreamer/registerer/grpc.c b/ext/nnstreamer/registerer/grpc.c index d98dbdd..3df76ae 100644 --- a/ext/nnstreamer/registerer/grpc.c +++ b/ext/nnstreamer/registerer/grpc.c @@ -20,6 +20,7 @@ #include #include +#include #define NNSTREAMER_GRPC_INIT(plugin,name,type) \ do { \ @@ -36,6 +37,7 @@ static gboolean gst_nnstreamer_grpc_init (GstPlugin * plugin) { NNSTREAMER_GRPC_INIT (plugin, src_grpc, SRC_GRPC); + NNSTREAMER_GRPC_INIT (plugin, sink_grpc, SINK_GRPC); return TRUE; } diff --git a/ext/nnstreamer/tensor_sink/meson.build b/ext/nnstreamer/tensor_sink/meson.build new file mode 100644 index 0000000..a32d616 --- /dev/null +++ b/ext/nnstreamer/tensor_sink/meson.build @@ -0,0 +1,15 @@ +if grpc_support_is_available + grpc_tensor_sink_source_files = [ + 'tensor_sink_grpc.c' + ] + grpc_tensor_sink_sources = [] + + foreach s : grpc_tensor_sink_source_files + grpc_tensor_sink_sources += join_paths(meson.current_source_dir(), s) + endforeach + + tensor_sink_grpc_dep = declare_dependency( + sources : grpc_tensor_sink_sources, + dependencies : [glib_dep, gst_dep, nnstreamer_dep, grpc_support_deps, protobuf_util_dep] + ) +endif diff --git a/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c new file mode 100644 index 0000000..2985960 --- /dev/null +++ b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c @@ -0,0 +1,395 @@ +/* SPDX-License-Identifier: LGPL-2.1-only */ +/** + * GStreamer Tensor_Sink_gRPC + * Copyright (C) 2020 Dongju Chae + */ +/** + * @file tensor_sink_grpc.c + * @date 22 Oct 2020 + * @brief GStreamer plugin to support gRPC tensor sink + * @see http://github.com/nnstreamer/nnstreamer + * @author Dongju Chae + * @bug No known bugs except for NYI items + */ + +/** + * SECTION:element-tensor_sink_grpc + * + * #tensor_sink_grpc extends #gstbasesink sink element to emit gRPC + * messages as either server or client. + * + * + * Example launch line + * |[ + * gst-launch -v -m videotestsrc + * ! video/x-raw,format=RGB,width=640,height=480,framerate=30/1 + * ! tensor_converter ! tensor_sink_grpc + * ]| + * + */ + +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include + +#include +#include +#include + +#include +#include +#include + +#include "tensor_sink_grpc.h" + +/** + * @brief Macro for debug mode. + */ +#ifndef DBG +#define DBG (!self->silent) +#endif + +/** + * @brief Macro for debug message. + */ +#define silent_debug(...) do { \ + if (DBG) { \ + GST_DEBUG_OBJECT (self, __VA_ARGS__); \ + } \ + } while (0) + +GST_DEBUG_CATEGORY_STATIC (gst_tensor_sink_grpc_debug); +#define GST_CAT_DEFAULT gst_tensor_sink_grpc_debug + +/** + * @brief Flag to print minimized log + */ +#define DEFAULT_PROP_SILENT TRUE + +/** + * @brief Default gRPC mode for tensor sink + */ +#define DEFAULT_PROP_SERVER FALSE + +/** + * @brief Default host and port + */ +#define DEFAULT_PROP_HOST "localhost" +#define DEFAULT_PROP_PORT 55115 + +#define CAPS_STRING GST_TENSOR_CAP_DEFAULT "; " GST_TENSORS_CAP_DEFAULT + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS (CAPS_STRING)); + +enum +{ + PROP_0, + PROP_SILENT, + PROP_SERVER, + PROP_HOST, + PROP_PORT, + PROP_OUT, +}; + +/** GObject method implementation */ +static void gst_tensor_sink_grpc_finalize (GObject * gobject); +static void gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec); +static void gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec); + +/** GstBaseSink method implementation */ +static gboolean gst_tensor_sink_grpc_setcaps (GstBaseSink * sink, + GstCaps * caps); +static GstFlowReturn gst_tensor_sink_grpc_render (GstBaseSink * sink, + GstBuffer * buf); +static gboolean gst_tensor_sink_grpc_start (GstBaseSink * sink); +static gboolean gst_tensor_sink_grpc_stop (GstBaseSink * sink); + +static gboolean gst_tensor_sink_grpc_unlock (GstBaseSink * bsink); + +/** internal functions */ +#define gst_tensor_sink_grpc_parent_class parent_class +G_DEFINE_TYPE (GstTensorSinkGRPC, gst_tensor_sink_grpc, GST_TYPE_BASE_SINK); + +/** + * @brief initialize the tensor_sink_grpc class. + */ +static void +gst_tensor_sink_grpc_class_init (GstTensorSinkGRPCClass * klass) +{ + GObjectClass *gobject_class; + GstElementClass *gstelement_class; + GstBaseSinkClass *gstbasesink_class; + + gobject_class = (GObjectClass *) klass; + gstelement_class = (GstElementClass *) klass; + gstbasesink_class = (GstBaseSinkClass *) klass; + + parent_class = g_type_class_peek_parent (klass); + + gobject_class->set_property = gst_tensor_sink_grpc_set_property; + gobject_class->get_property = gst_tensor_sink_grpc_get_property; + gobject_class->finalize = gst_tensor_sink_grpc_finalize; + + /* install properties */ + g_object_class_install_property (gobject_class, PROP_SILENT, + g_param_spec_boolean ("silent", "Silent", + "Dont' produce verbose output", + DEFAULT_PROP_SILENT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_SERVER, + g_param_spec_boolean ("server", "Server", + "Specify its working mode either server or client", + DEFAULT_PROP_SERVER, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_HOST, + g_param_spec_string ("host", "Host", "The host/IP to send the packets to", + DEFAULT_PROP_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_PORT, + g_param_spec_int ("port", "Port", "The port to send the packets to", + 0, G_MAXUSHORT, DEFAULT_PROP_PORT, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + + g_object_class_install_property (gobject_class, PROP_OUT, + g_param_spec_uint ("out", "Out", + "The number of output messages generated", + 0, G_MAXUINT, 0, + G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); + + gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate); + + gst_element_class_set_static_metadata (gstelement_class, + "TensorSinkGRPC", "Sink/Network", + "Send nnstreamer protocol buffers as gRPC server/client", + "Dongju Chae "); + + /* GstBaseSinkClass */ + gstbasesink_class->start = gst_tensor_sink_grpc_start; + gstbasesink_class->stop = gst_tensor_sink_grpc_stop; + gstbasesink_class->set_caps = gst_tensor_sink_grpc_setcaps; + gstbasesink_class->render = gst_tensor_sink_grpc_render; + gstbasesink_class->unlock = gst_tensor_sink_grpc_unlock; + + GST_DEBUG_CATEGORY_INIT (gst_tensor_sink_grpc_debug, + "tensor_sink_grpc", 0, + "sink element to support protocol buffers as a gRPC server/client"); +} + +/** + * @brief initialize tensor_sink_grpc element. + */ +static void +gst_tensor_sink_grpc_init (GstTensorSinkGRPC * self) +{ + self->silent = DEFAULT_PROP_SILENT; + self->server = DEFAULT_PROP_SERVER; + self->host = g_strdup (DEFAULT_PROP_HOST); + self->port = DEFAULT_PROP_PORT; + self->priv = NULL; + self->out = 0; + + gst_tensors_config_init (&self->config); + + GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_CONFIGURED); + GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_STARTED); +} + +/** + * @brief finalize tensor_sink_grpc element. + */ +static void +gst_tensor_sink_grpc_finalize (GObject * gobject) +{ + GstTensorSinkGRPC *this = GST_TENSOR_SINK_GRPC (gobject); + + g_free (this->host); + this->host = NULL; + + G_OBJECT_CLASS (parent_class)->finalize (gobject); +} + +/** + * @brief set caps of tensor_sink_grpc element. + */ +static gboolean +gst_tensor_sink_grpc_setcaps (GstBaseSink * sink, GstCaps * caps) +{ + GstTensorSinkGRPC * self; + GstStructure * structure; + + self = GST_TENSOR_SINK_GRPC (sink); + + GST_OBJECT_LOCK (self); + + structure = gst_caps_get_structure (caps, 0); + gst_tensors_config_from_structure (&self->config, structure); + + GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_CONFIGURED); + + grpc_set_config (self, &self->config); + + GST_OBJECT_UNLOCK (self); + + return gst_tensors_config_validate (&self->config); +} + +/** + * @brief render function of tensor_sink_grpc element. + */ +static GstFlowReturn +gst_tensor_sink_grpc_render (GstBaseSink * sink, GstBuffer * buf) +{ + GstTensorSinkGRPC * self = GST_TENSOR_SINK_GRPC (sink); + gboolean ret; + + g_return_val_if_fail ( + GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED), + GST_FLOW_FLUSHING); + + ret = grpc_send (self, buf); + + return ret ? GST_FLOW_OK : GST_FLOW_ERROR; +} + +/** + * @brief set properties of tensor_sink_grpc element. + */ +static void +gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id, + const GValue * value, GParamSpec * pspec) +{ + GstTensorSinkGRPC * self; + + g_return_if_fail (GST_IS_TENSOR_SINK_GRPC (object)); + self = GST_TENSOR_SINK_GRPC (object); + + switch (prop_id) { + case PROP_SILENT: + self->silent = g_value_get_boolean (value); + silent_debug ("Set silent = %d", self->silent); + break; + case PROP_SERVER: + self->server = g_value_get_boolean (value); + silent_debug ("Set server = %d", self->server); + break; + case PROP_HOST: + if (!g_value_get_string (value)) { + ml_logw ("host property cannot be NULL"); + break; + } + g_free (self->host); + self->host = g_value_dup_string (value); + silent_debug ("Set host = %s", self->host); + break; + case PROP_PORT: + self->port = g_value_get_int (value); + silent_debug ("Set port = %d", self->port); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +/** + * @brief get properties of tensor_sink_grpc element. + */ +static void +gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id, + GValue * value, GParamSpec * pspec) +{ + GstTensorSinkGRPC * self; + + g_return_if_fail (GST_IS_TENSOR_SINK_GRPC (object)); + self = GST_TENSOR_SINK_GRPC (object); + + switch (prop_id) { + case PROP_SILENT: + g_value_set_boolean (value, self->silent); + break; + case PROP_SERVER: + g_value_set_boolean (value, self->server); + break; + case PROP_HOST: + g_value_set_string (value, self->host); + break; + case PROP_PORT: + g_value_set_int (value, self->port); + break; + case PROP_OUT: + g_value_set_uint (value, self->out); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); + break; + } +} + +/** + * @brief start tensor_sink_grpc element. + */ +static gboolean +gst_tensor_sink_grpc_start (GstBaseSink * sink) +{ + GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink); + gboolean ret; + + if (GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED)) + return TRUE; + + if (self->priv) + grpc_destroy (self); + + self->priv = grpc_new (self); + if (!self->priv) + return FALSE; + + ret = grpc_start (self); + if (ret) + GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_STARTED); + + return TRUE; +} + +/** + * @brief stop tensor_sink_grpc element. + */ +static gboolean +gst_tensor_sink_grpc_stop (GstBaseSink * sink) +{ + GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink); + + if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED)) + return TRUE; + + if (self->priv) + grpc_destroy (self); + self->priv = NULL; + + GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_STARTED); + + return TRUE; +} + +/** + * @brief unlock any blocking operations + */ +static gboolean gst_tensor_sink_grpc_unlock (GstBaseSink * sink) +{ + GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink); + + /* notify to gRPC */ + grpc_stop (self); + + return TRUE; +} diff --git a/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h new file mode 100644 index 0000000..12f732d --- /dev/null +++ b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h @@ -0,0 +1,84 @@ +/* SPDX-License-Identifier: LGPL-2.1-only */ +/** + * GStreamer Tensor_Sink_gRPC + * Copyright (C) 2020 Dongju Chae + */ +/** + * @file tensor_sink_grpc.h + * @date 22 Oct 2020 + * @brief GStreamer plugin to support gRPC tensor sink + * @see http://github.com/nnstreamer/nnstreamer + * @author Dongju Chae + * @bug No known bugs except for NYI items + */ + +#ifndef __GST_TENSOR_SINK_GRPC_H__ +#define __GST_TENSOR_SINK_GRPC_H__ + +#include +#include + +#include + +G_BEGIN_DECLS + +#define GST_TYPE_TENSOR_SINK_GRPC \ + (gst_tensor_sink_grpc_get_type()) +#define GST_TENSOR_SINK_GRPC(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TENSOR_SINK_GRPC,GstTensorSinkGRPC)) +#define GST_TENSOR_SINK_GRPC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TENSOR_SINK_GRPC,GstTensorSinkGRPCClass)) +#define GST_IS_TENSOR_SINK_GRPC(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TENSOR_SINK_GRPC)) +#define GST_IS_TENSOR_SINK_GRPC_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TENSOR_SINK_GRPC)) + +typedef struct _GstTensorSinkGRPC GstTensorSinkGRPC; +typedef struct _GstTensorSinkGRPCClass GstTensorSinkGRPCClass; + +typedef enum { + GST_TENSOR_SINK_GRPC_CONFIGURED = (GST_ELEMENT_FLAG_LAST << 0), + GST_TENSOR_SINK_GRPC_STARTED = (GST_ELEMENT_FLAG_LAST << 1), +} GstTensorSinkGRPCFlags; + +/** + * @brief GstTensorSinkGRPC data structure. + * + * GstTensorSinkGRPC inherits GstPushSinkGRPC. + */ +struct _GstTensorSinkGRPC +{ + GstBaseSink element; /**< parent class object */ + + /** Properties saved */ + gboolean silent; /**< true to print minimized log */ + gboolean server; /**< true to enable server mode */ + gint port; /**< gRPC server port number */ + gchar *host; /**< gRPC server host name */ + guint out; /**< number of output messages */ + + /** Working variables */ + GstTensorsConfig config; + + /** private data */ + void *priv; +}; + +/** + * @brief GstTensorSinkGRPCClass data structure. + * + * GstTensorSinkGRPC inherits GstBaseSink. + */ +struct _GstTensorSinkGRPCClass +{ + GstBaseSinkClass parent_class; /**< inherits class object */ +}; + +/** + * @brief Function to get type of tensor_sink_grpc. + */ +GType gst_tensor_sink_grpc_get_type (void); + +G_END_DECLS + +#endif /** __GST_TENSOR_SINK_GRPC_H__ */