[gRPC/Sink] Implement gRPC tensor sink
authorDongju Chae <dongju.chae@samsung.com>
Wed, 4 Nov 2020 10:36:43 +0000 (19:36 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Fri, 13 Nov 2020 06:33:35 +0000 (15:33 +0900)
This patch implements gRPC tensor sink as a client.
Its gRPC server mode is WIP.

Signed-off-by: Dongju Chae <dongju.chae@samsung.com>
ext/nnstreamer/meson.build
ext/nnstreamer/registerer/grpc.c
ext/nnstreamer/tensor_sink/meson.build [new file with mode: 0644]
ext/nnstreamer/tensor_sink/tensor_sink_grpc.c [new file with mode: 0644]
ext/nnstreamer/tensor_sink/tensor_sink_grpc.h [new file with mode: 0644]

index 936d94e..c1c7ebf 100644 (file)
@@ -45,6 +45,7 @@ subdir('tensor_decoder')
 subdir('tensor_filter')
 subdir('tensor_source')
 subdir('tensor_converter')
+subdir('tensor_sink')
 
 if get_option('enable-tizen-sensor')
   tizensensor_registerer_source_files = ['registerer/tizensensor.c']
@@ -79,7 +80,7 @@ if grpc_support_is_available
 
   grpc_lib = shared_library('nnstreamer-grpc',
     grpc_registerer_sources,
-    dependencies: [tensor_src_grpc_dep],
+    dependencies: [tensor_src_grpc_dep, tensor_sink_grpc_dep],
     install: true,
     install_dir: plugins_install_dir
   )
index d98dbdd..3df76ae 100644 (file)
@@ -20,6 +20,7 @@
 #include <gst/gst.h>
 
 #include <tensor_source/tensor_src_grpc.h>
+#include <tensor_sink/tensor_sink_grpc.h>
 
 #define NNSTREAMER_GRPC_INIT(plugin,name,type) \
   do { \
@@ -36,6 +37,7 @@ static gboolean
 gst_nnstreamer_grpc_init (GstPlugin * plugin)
 {
   NNSTREAMER_GRPC_INIT (plugin, src_grpc, SRC_GRPC);
+  NNSTREAMER_GRPC_INIT (plugin, sink_grpc, SINK_GRPC);
   return TRUE;
 }
 
diff --git a/ext/nnstreamer/tensor_sink/meson.build b/ext/nnstreamer/tensor_sink/meson.build
new file mode 100644 (file)
index 0000000..a32d616
--- /dev/null
@@ -0,0 +1,15 @@
+if grpc_support_is_available
+  grpc_tensor_sink_source_files = [
+    'tensor_sink_grpc.c'
+  ]
+  grpc_tensor_sink_sources = []
+
+  foreach s : grpc_tensor_sink_source_files
+    grpc_tensor_sink_sources += join_paths(meson.current_source_dir(), s)
+  endforeach
+
+  tensor_sink_grpc_dep = declare_dependency(
+    sources : grpc_tensor_sink_sources,
+    dependencies : [glib_dep, gst_dep, nnstreamer_dep, grpc_support_deps, protobuf_util_dep]
+  )
+endif
diff --git a/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c
new file mode 100644 (file)
index 0000000..2985960
--- /dev/null
@@ -0,0 +1,395 @@
+/* SPDX-License-Identifier: LGPL-2.1-only */
+/**
+ * GStreamer Tensor_Sink_gRPC
+ * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
+ */
+/**
+ * @file       tensor_sink_grpc.c
+ * @date       22 Oct 2020
+ * @brief      GStreamer plugin to support gRPC tensor sink
+ * @see                http://github.com/nnstreamer/nnstreamer
+ * @author     Dongju Chae <dongju.chae@samsung.com>
+ * @bug                No known bugs except for NYI items
+ */
+
+/**
+ * SECTION:element-tensor_sink_grpc
+ *
+ * #tensor_sink_grpc extends #gstbasesink sink element to emit gRPC
+ * messages as either server or client.
+ *
+ * <refsect2>
+ * <title>Example launch line</title>
+ * |[
+ * gst-launch -v -m videotestsrc
+ *        ! video/x-raw,format=RGB,width=640,height=480,framerate=30/1
+ *        ! tensor_converter ! tensor_sink_grpc
+ * ]|
+ * </refsect2>
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <string.h>
+#include <errno.h>
+
+#include <gst/gst.h>
+#include <glib.h>
+#include <gmodule.h>
+
+#include <tensor_typedef.h>
+#include <nnstreamer_plugin_api.h>
+#include <nnstreamer_log.h>
+
+#include "tensor_sink_grpc.h"
+
+/**
+ * @brief Macro for debug mode.
+ */
+#ifndef DBG
+#define DBG (!self->silent)
+#endif
+
+/**
+ * @brief Macro for debug message.
+ */
+#define silent_debug(...) do { \
+    if (DBG) { \
+      GST_DEBUG_OBJECT (self, __VA_ARGS__); \
+    } \
+  } while (0)
+
+GST_DEBUG_CATEGORY_STATIC (gst_tensor_sink_grpc_debug);
+#define GST_CAT_DEFAULT gst_tensor_sink_grpc_debug
+
+/**
+ * @brief Flag to print minimized log
+ */
+#define DEFAULT_PROP_SILENT TRUE
+
+/**
+ * @brief Default gRPC mode for tensor sink
+ */
+#define DEFAULT_PROP_SERVER FALSE
+
+/**
+ * @brief Default host and port
+ */
+#define DEFAULT_PROP_HOST  "localhost"
+#define DEFAULT_PROP_PORT  55115
+
+#define CAPS_STRING GST_TENSOR_CAP_DEFAULT "; " GST_TENSORS_CAP_DEFAULT
+
+static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
+    GST_PAD_SINK,
+    GST_PAD_ALWAYS,
+    GST_STATIC_CAPS (CAPS_STRING));
+
+enum
+{
+  PROP_0,
+  PROP_SILENT,
+  PROP_SERVER,
+  PROP_HOST,
+  PROP_PORT,
+  PROP_OUT,
+};
+
+/** GObject method implementation */
+static void gst_tensor_sink_grpc_finalize (GObject * gobject);
+static void gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec);
+static void gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec);
+
+/** GstBaseSink method implementation */
+static gboolean gst_tensor_sink_grpc_setcaps (GstBaseSink * sink,
+    GstCaps * caps);
+static GstFlowReturn gst_tensor_sink_grpc_render (GstBaseSink * sink,
+    GstBuffer * buf);
+static gboolean gst_tensor_sink_grpc_start (GstBaseSink * sink);
+static gboolean gst_tensor_sink_grpc_stop (GstBaseSink * sink);
+
+static gboolean gst_tensor_sink_grpc_unlock (GstBaseSink * bsink);
+
+/** internal functions */
+#define gst_tensor_sink_grpc_parent_class parent_class
+G_DEFINE_TYPE (GstTensorSinkGRPC, gst_tensor_sink_grpc, GST_TYPE_BASE_SINK);
+
+/**
+ * @brief initialize the tensor_sink_grpc class.
+ */
+static void
+gst_tensor_sink_grpc_class_init (GstTensorSinkGRPCClass * klass)
+{
+  GObjectClass *gobject_class;
+  GstElementClass *gstelement_class;
+  GstBaseSinkClass *gstbasesink_class;
+
+  gobject_class = (GObjectClass *) klass;
+  gstelement_class = (GstElementClass *) klass;
+  gstbasesink_class = (GstBaseSinkClass *) klass;
+
+  parent_class = g_type_class_peek_parent (klass);
+
+  gobject_class->set_property = gst_tensor_sink_grpc_set_property;
+  gobject_class->get_property = gst_tensor_sink_grpc_get_property;
+  gobject_class->finalize = gst_tensor_sink_grpc_finalize;
+
+  /* install properties */
+  g_object_class_install_property (gobject_class, PROP_SILENT,
+      g_param_spec_boolean ("silent", "Silent",
+        "Dont' produce verbose output",
+        DEFAULT_PROP_SILENT,
+        G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_SERVER,
+      g_param_spec_boolean ("server", "Server",
+        "Specify its working mode either server or client",
+        DEFAULT_PROP_SERVER,
+        G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_HOST,
+      g_param_spec_string ("host", "Host", "The host/IP to send the packets to",
+          DEFAULT_PROP_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_PORT,
+      g_param_spec_int ("port", "Port", "The port to send the packets to",
+          0, G_MAXUSHORT, DEFAULT_PROP_PORT,
+          G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+  g_object_class_install_property (gobject_class, PROP_OUT,
+      g_param_spec_uint ("out", "Out",
+        "The number of output messages generated",
+        0, G_MAXUINT, 0,
+        G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
+
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
+
+  gst_element_class_set_static_metadata (gstelement_class,
+      "TensorSinkGRPC", "Sink/Network",
+      "Send nnstreamer protocol buffers as gRPC server/client",
+      "Dongju Chae <dongju.chae@samsung.com>");
+
+  /* GstBaseSinkClass */
+  gstbasesink_class->start = gst_tensor_sink_grpc_start;
+  gstbasesink_class->stop = gst_tensor_sink_grpc_stop;
+  gstbasesink_class->set_caps = gst_tensor_sink_grpc_setcaps;
+  gstbasesink_class->render = gst_tensor_sink_grpc_render;
+  gstbasesink_class->unlock = gst_tensor_sink_grpc_unlock;
+
+  GST_DEBUG_CATEGORY_INIT (gst_tensor_sink_grpc_debug,
+      "tensor_sink_grpc", 0,
+      "sink element to support protocol buffers as a gRPC server/client");
+}
+
+/**
+ * @brief initialize tensor_sink_grpc element.
+ */
+static void
+gst_tensor_sink_grpc_init (GstTensorSinkGRPC * self)
+{
+  self->silent = DEFAULT_PROP_SILENT;
+  self->server = DEFAULT_PROP_SERVER;
+  self->host = g_strdup (DEFAULT_PROP_HOST);
+  self->port = DEFAULT_PROP_PORT;
+  self->priv = NULL;
+  self->out = 0;
+
+  gst_tensors_config_init (&self->config);
+
+  GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_CONFIGURED);
+  GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_STARTED);
+}
+
+/**
+ * @brief finalize tensor_sink_grpc element.
+ */
+static void
+gst_tensor_sink_grpc_finalize (GObject * gobject)
+{
+  GstTensorSinkGRPC *this = GST_TENSOR_SINK_GRPC (gobject);
+
+  g_free (this->host);
+  this->host = NULL;
+
+  G_OBJECT_CLASS (parent_class)->finalize (gobject);
+}
+
+/**
+ * @brief set caps of tensor_sink_grpc element.
+ */
+static gboolean
+gst_tensor_sink_grpc_setcaps (GstBaseSink * sink, GstCaps * caps)
+{
+  GstTensorSinkGRPC * self;
+  GstStructure * structure;
+
+  self = GST_TENSOR_SINK_GRPC (sink);
+
+  GST_OBJECT_LOCK (self);
+
+  structure = gst_caps_get_structure (caps, 0);
+  gst_tensors_config_from_structure (&self->config, structure);
+
+  GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_CONFIGURED);
+
+  grpc_set_config (self, &self->config);
+
+  GST_OBJECT_UNLOCK (self);
+
+  return gst_tensors_config_validate (&self->config);
+}
+
+/**
+ * @brief render function of tensor_sink_grpc element.
+ */
+static GstFlowReturn
+gst_tensor_sink_grpc_render (GstBaseSink * sink, GstBuffer * buf)
+{
+  GstTensorSinkGRPC * self = GST_TENSOR_SINK_GRPC (sink);
+  gboolean ret;
+
+  g_return_val_if_fail (
+      GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED),
+      GST_FLOW_FLUSHING);
+
+  ret = grpc_send (self, buf);
+
+  return ret ? GST_FLOW_OK : GST_FLOW_ERROR;
+}
+
+/**
+ * @brief set properties of tensor_sink_grpc element.
+ */
+static void
+gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id,
+    const GValue * value, GParamSpec * pspec)
+{
+  GstTensorSinkGRPC * self;
+
+  g_return_if_fail (GST_IS_TENSOR_SINK_GRPC (object));
+  self = GST_TENSOR_SINK_GRPC (object);
+
+  switch (prop_id) {
+    case PROP_SILENT:
+      self->silent = g_value_get_boolean (value);
+      silent_debug ("Set silent = %d", self->silent);
+      break;
+    case PROP_SERVER:
+      self->server = g_value_get_boolean (value);
+      silent_debug ("Set server = %d", self->server);
+      break;
+    case PROP_HOST:
+      if (!g_value_get_string (value)) {
+        ml_logw ("host property cannot be NULL");
+        break;
+      }
+      g_free (self->host);
+      self->host = g_value_dup_string (value);
+      silent_debug ("Set host = %s", self->host);
+      break;
+    case PROP_PORT:
+      self->port = g_value_get_int (value);
+      silent_debug ("Set port = %d", self->port);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+/**
+ * @brief get properties of tensor_sink_grpc element.
+ */
+static void
+gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id,
+    GValue * value, GParamSpec * pspec)
+{
+  GstTensorSinkGRPC * self;
+
+  g_return_if_fail (GST_IS_TENSOR_SINK_GRPC (object));
+  self = GST_TENSOR_SINK_GRPC (object);
+
+  switch (prop_id) {
+    case PROP_SILENT:
+      g_value_set_boolean (value, self->silent);
+      break;
+    case PROP_SERVER:
+      g_value_set_boolean (value, self->server);
+      break;
+    case PROP_HOST:
+      g_value_set_string (value, self->host);
+      break;
+    case PROP_PORT:
+      g_value_set_int (value, self->port);
+      break;
+    case PROP_OUT:
+      g_value_set_uint (value, self->out);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+  }
+}
+
+/**
+ * @brief start tensor_sink_grpc element.
+ */
+static gboolean
+gst_tensor_sink_grpc_start (GstBaseSink * sink)
+{
+  GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
+  gboolean ret;
+
+  if (GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED))
+    return TRUE;
+
+  if (self->priv)
+    grpc_destroy (self);
+
+  self->priv = grpc_new (self);
+  if (!self->priv)
+    return FALSE;
+
+  ret = grpc_start (self);
+  if (ret)
+    GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_STARTED);
+
+  return TRUE;
+}
+
+/**
+ * @brief stop tensor_sink_grpc element.
+ */
+static gboolean
+gst_tensor_sink_grpc_stop (GstBaseSink * sink)
+{
+  GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
+
+  if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SINK_GRPC_STARTED))
+    return TRUE;
+
+  if (self->priv)
+    grpc_destroy (self);
+  self->priv = NULL;
+
+  GST_OBJECT_FLAG_UNSET (self, GST_TENSOR_SINK_GRPC_STARTED);
+
+  return TRUE;
+}
+
+/**
+ * @brief unlock any blocking operations
+ */
+static gboolean gst_tensor_sink_grpc_unlock (GstBaseSink * sink)
+{
+  GstTensorSinkGRPC *self = GST_TENSOR_SINK_GRPC (sink);
+
+  /* notify to gRPC */
+  grpc_stop (self);
+
+  return TRUE;
+}
diff --git a/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h
new file mode 100644 (file)
index 0000000..12f732d
--- /dev/null
@@ -0,0 +1,84 @@
+/* SPDX-License-Identifier: LGPL-2.1-only */
+/**
+ * GStreamer Tensor_Sink_gRPC
+ * Copyright (C) 2020 Dongju Chae <dongju.chae@samsung.com>
+ */
+/**
+ * @file       tensor_sink_grpc.h
+ * @date       22 Oct 2020
+ * @brief      GStreamer plugin to support gRPC tensor sink
+ * @see                http://github.com/nnstreamer/nnstreamer
+ * @author     Dongju Chae <dongju.chae@samsung.com>
+ * @bug                No known bugs except for NYI items
+ */
+
+#ifndef __GST_TENSOR_SINK_GRPC_H__
+#define __GST_TENSOR_SINK_GRPC_H__
+
+#include <gst/gst.h>
+#include <gst/base/gstbasesink.h>
+
+#include <nnstreamer_protobuf_grpc.h>
+
+G_BEGIN_DECLS
+
+#define GST_TYPE_TENSOR_SINK_GRPC \
+  (gst_tensor_sink_grpc_get_type())
+#define GST_TENSOR_SINK_GRPC(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TENSOR_SINK_GRPC,GstTensorSinkGRPC))
+#define GST_TENSOR_SINK_GRPC_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TENSOR_SINK_GRPC,GstTensorSinkGRPCClass))
+#define GST_IS_TENSOR_SINK_GRPC(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TENSOR_SINK_GRPC))
+#define GST_IS_TENSOR_SINK_GRPC_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TENSOR_SINK_GRPC))
+
+typedef struct _GstTensorSinkGRPC GstTensorSinkGRPC;
+typedef struct _GstTensorSinkGRPCClass GstTensorSinkGRPCClass;
+
+typedef enum {
+  GST_TENSOR_SINK_GRPC_CONFIGURED = (GST_ELEMENT_FLAG_LAST << 0),
+  GST_TENSOR_SINK_GRPC_STARTED = (GST_ELEMENT_FLAG_LAST << 1),
+} GstTensorSinkGRPCFlags;
+
+/**
+ * @brief GstTensorSinkGRPC data structure.
+ *
+ * GstTensorSinkGRPC inherits GstPushSinkGRPC.
+ */
+struct _GstTensorSinkGRPC
+{
+  GstBaseSink element; /**< parent class object */
+
+  /** Properties saved */
+  gboolean silent;      /**< true to print minimized log */
+  gboolean server;      /**< true to enable server mode */
+  gint port;            /**< gRPC server port number */
+  gchar *host;          /**< gRPC server host name */
+  guint out;            /**< number of output messages */
+
+  /** Working variables */
+  GstTensorsConfig config;
+
+  /** private data */
+  void *priv;
+};
+
+/**
+ * @brief GstTensorSinkGRPCClass data structure.
+ *
+ * GstTensorSinkGRPC inherits GstBaseSink.
+ */
+struct _GstTensorSinkGRPCClass
+{
+  GstBaseSinkClass parent_class; /**< inherits class object */
+};
+
+/**
+ * @brief Function to get type of tensor_sink_grpc.
+ */
+GType gst_tensor_sink_grpc_get_type (void);
+
+G_END_DECLS
+
+#endif /** __GST_TENSOR_SINK_GRPC_H__ */