From 57a0159bdf105898211e69769d6f3e94f66e7445 Mon Sep 17 00:00:00 2001 From: Dongju Chae Date: Tue, 1 Dec 2020 15:15:25 +0900 Subject: [PATCH] [gRPC] Revise existing build/source codes This patch revises existing build/source codes for gRPC utils refactoring. Signed-off-by: Dongju Chae --- debian/nnstreamer-grpc.install | 3 +- ext/nnstreamer/extra/meson.build | 42 ++++++++--- ext/nnstreamer/include/nnstreamer.fbs | 15 +++- ext/nnstreamer/meson.build | 12 ++++ ext/nnstreamer/tensor_sink/meson.build | 2 +- ext/nnstreamer/tensor_sink/tensor_sink_grpc.c | 48 ++++++++++++- ext/nnstreamer/tensor_sink/tensor_sink_grpc.h | 3 +- ext/nnstreamer/tensor_source/meson.build | 2 +- ext/nnstreamer/tensor_source/tensor_src_grpc.c | 98 ++++++++++++-------------- ext/nnstreamer/tensor_source/tensor_src_grpc.h | 9 +-- packaging/nnstreamer.spec | 3 +- 11 files changed, 162 insertions(+), 75 deletions(-) diff --git a/debian/nnstreamer-grpc.install b/debian/nnstreamer-grpc.install index d595524..3ebaa8e 100644 --- a/debian/nnstreamer-grpc.install +++ b/debian/nnstreamer-grpc.install @@ -1,2 +1,3 @@ /usr/lib/*/gstreamer-1.0/libnnstreamer-grpc.so -/usr/lib/*/libnnstreamer_protobuf_grpc.so +/usr/lib/*/libnnstreamer_grpc_protobuf.so +/usr/lib/*/libnnstreamer_grpc_flatbuf.so diff --git a/ext/nnstreamer/extra/meson.build b/ext/nnstreamer/extra/meson.build index adef960..19786d8 100644 --- a/ext/nnstreamer/extra/meson.build +++ b/ext/nnstreamer/extra/meson.build @@ -1,5 +1,5 @@ if protobuf_support_is_available - # Don't generate .proto file twice + # Don't generate proto files twice nns_protobuf_lib = shared_library('nnstreamer_protobuf', sources: pb_gen_src, dependencies: protobuf_dep, @@ -32,8 +32,12 @@ if grpc_support_is_available error('failed to resolve dependency: gRPC requires libprotobuf.') endif - # Don't generate .proto file twice - nns_protobuf_grpc_lib = shared_library ('nnstreamer_protobuf_grpc', + if not flatbuf_support_is_available + error('failed to resolve dependency: gRPC requires libflatbuf.') + endif + + # Don't generate proto files twice + nns_protobuf_grpc_lib = shared_library ('nnstreamer_grpc_protobuf', sources : grpc_pb_gen_src, dependencies : [grpc_support_deps, nns_protobuf_dep], install: true, @@ -45,17 +49,33 @@ if grpc_support_is_available include_directories: nns_protobuf_grpc_lib.private_dir_include() ) - grpc_protobuf_util_sources = ['nnstreamer_protobuf_grpc.cc'] - grpc_protobuf_util_deps = [nnstreamer_dep, glib_dep, gst_dep, nns_protobuf_grpc_dep] + nns_flatbuf_grpc_lib = shared_library ('nnstreamer_grpc_flatbuf', + sources : grpc_fb_gen_src, + dependencies : [grpc_support_deps, flatbuf_dep], + install: true, + install_dir: nnstreamer_libdir + ) + nns_flatbuf_grpc_dep = declare_dependency( + link_with: nns_flatbuf_grpc_lib, + dependencies : [grpc_support_deps, flatbuf_dep], + include_directories: nns_flatbuf_grpc_lib.private_dir_include() + ) + + grpc_util_sources = [ + 'nnstreamer_grpc_common.cc', + 'nnstreamer_grpc_protobuf.cc', + 'nnstreamer_grpc_flatbuf.cc' + ] + grpc_util_deps = [nns_protobuf_grpc_dep, nns_flatbuf_grpc_dep] - nns_grpc_protobuf_util_sources = [] - foreach s : grpc_protobuf_util_sources - nns_grpc_protobuf_util_sources += join_paths(meson.current_source_dir(), s) + nns_grpc_util_sources = [] + foreach s : grpc_util_sources + nns_grpc_util_sources += join_paths(meson.current_source_dir(), s) endforeach - grpc_protobuf_util_dep = declare_dependency( - sources: nns_grpc_protobuf_util_sources, - dependencies: grpc_protobuf_util_deps, + grpc_util_dep = declare_dependency( + sources: nns_grpc_util_sources, + dependencies: [nnstreamer_dep, glib_dep, gst_dep, grpc_util_deps], include_directories: include_directories('.') ) endif diff --git a/ext/nnstreamer/include/nnstreamer.fbs b/ext/nnstreamer/include/nnstreamer.fbs index bd4ed6e..7eca1c5 100644 --- a/ext/nnstreamer/include/nnstreamer.fbs +++ b/ext/nnstreamer/include/nnstreamer.fbs @@ -9,7 +9,7 @@ namespace nnstreamer.flatbuf; -enum Tensor_type : int { +enum Tensor_type : int { NNS_INT32 = 0, NNS_UINT32, NNS_INT16, @@ -33,7 +33,7 @@ table Tensor { name : string; type : Tensor_type = NNS_END; dimension : [uint32]; // support up to 4th ranks. - data : [ubyte]; + data : [ubyte]; } table Tensors { @@ -43,3 +43,14 @@ table Tensors { } root_type Tensors; + +table Empty { +} + +// clients should initiate RPC calls first but can keep the streaming +rpc_service TensorService { + // client-to-server streaming + SendTensors(Tensors):Empty (streaming: "client"); + // server-to-client streaming + RecvTensors(Empty):Tensors (streaming: "server"); +} diff --git a/ext/nnstreamer/meson.build b/ext/nnstreamer/meson.build index c1c7ebf..92308ba 100644 --- a/ext/nnstreamer/meson.build +++ b/ext/nnstreamer/meson.build @@ -22,6 +22,11 @@ if grpc_support_is_available error('failed to resolve dependency: gRPC requires libprotobuf.') endif + if not flatbuf_support_is_available + error('failed to resolve dependency: gRPC requires libflatbuf.') + endif + + # gRPC/Protobuf prog_which = find_program('which') grpc_cpp_plugin_path = run_command(prog_which, 'grpc_cpp_plugin').stdout().strip() if grpc_cpp_plugin_path == '' @@ -38,6 +43,13 @@ if grpc_support_is_available ] ) grpc_pb_gen_src = grpc_pb_gen.process('./include/nnstreamer.proto') + + # gRPC/Flatbuf + grpc_fb_gen = generator(flatc, + output : ['@BASENAME@_generated.h', '@BASENAME@.grpc.fb.h', '@BASENAME@.grpc.fb.cc'], + arguments : ['--cpp', '--grpc', '-o', '@BUILD_DIR@', '@INPUT@'] + ) + grpc_fb_gen_src = grpc_fb_gen.process('./include/nnstreamer.fbs') endif subdir('extra') diff --git a/ext/nnstreamer/tensor_sink/meson.build b/ext/nnstreamer/tensor_sink/meson.build index a32d616..70e6d44 100644 --- a/ext/nnstreamer/tensor_sink/meson.build +++ b/ext/nnstreamer/tensor_sink/meson.build @@ -10,6 +10,6 @@ if grpc_support_is_available tensor_sink_grpc_dep = declare_dependency( sources : grpc_tensor_sink_sources, - dependencies : [glib_dep, gst_dep, nnstreamer_dep, grpc_support_deps, protobuf_util_dep] + dependencies : grpc_util_dep, ) endif diff --git a/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c index d3805d5..ae35aaf 100644 --- a/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c +++ b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.c @@ -75,6 +75,11 @@ GST_DEBUG_CATEGORY_STATIC (gst_tensor_sink_grpc_debug); #define DEFAULT_PROP_SERVER FALSE /** + * @brief Default IDL for RPC comm. + */ +#define DEFAULT_PROP_IDL "protobuf" + +/** * @brief Default host and port */ #define DEFAULT_PROP_HOST "localhost" @@ -92,6 +97,7 @@ enum PROP_0, PROP_SILENT, PROP_SERVER, + PROP_IDL, PROP_HOST, PROP_PORT, PROP_OUT, @@ -151,6 +157,11 @@ gst_tensor_sink_grpc_class_init (GstTensorSinkGRPCClass * klass) DEFAULT_PROP_SERVER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_IDL, + g_param_spec_string ("idl", "IDL", + "Specify Interface Description Language (IDL) for communication", + DEFAULT_PROP_IDL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_HOST, g_param_spec_string ("host", "Host", "The host/IP to send the packets to", DEFAULT_PROP_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); @@ -193,6 +204,7 @@ gst_tensor_sink_grpc_init (GstTensorSinkGRPC * self) { self->silent = DEFAULT_PROP_SILENT; self->server = DEFAULT_PROP_SERVER; + self->idl = grpc_get_idl (DEFAULT_PROP_IDL); self->host = g_strdup (DEFAULT_PROP_HOST); self->port = DEFAULT_PROP_PORT; self->priv = NULL; @@ -273,6 +285,7 @@ _check_hostname (gchar * str) return FALSE; } + /** * @brief set properties of tensor_sink_grpc element. */ @@ -294,6 +307,21 @@ gst_tensor_sink_grpc_set_property (GObject * object, guint prop_id, self->server = g_value_get_boolean (value); silent_debug ("Set server = %d", self->server); break; + case PROP_IDL: + { + const gchar * idl_str = g_value_get_string (value); + + if (idl_str) { + grpc_idl idl = grpc_get_idl (idl_str); + if (idl != GRPC_IDL_NONE) { + self->idl = idl; + silent_debug ("Set idl = %s", idl_str); + } else { + ml_loge ("Invalid IDL string provided: %s", idl_str); + } + } + break; + } case PROP_HOST: { gchar * host; @@ -340,6 +368,18 @@ gst_tensor_sink_grpc_get_property (GObject * object, guint prop_id, case PROP_SERVER: g_value_set_boolean (value, self->server); break; + case PROP_IDL: + switch (self->idl) { + case GRPC_IDL_PROTOBUF: + g_value_set_string (value, "protobuf"); + break; + case GRPC_IDL_FLATBUF: + g_value_set_string (value, "flatbuf"); + break; + default: + break; + } + break; case PROP_HOST: g_value_set_string (value, self->host); break; @@ -374,10 +414,14 @@ gst_tensor_sink_grpc_start (GstBaseSink * sink) if (!self->priv) return FALSE; - ret = grpc_start (self, GRPC_DIRECTION_TO_PROTOBUF); - if (ret) + ret = grpc_start (self, GRPC_DIRECTION_TENSORS_TO_BUFFER); + if (ret) { GST_OBJECT_FLAG_SET (self, GST_TENSOR_SINK_GRPC_STARTED); + if (self->server) + g_object_set (self, "port", grpc_get_listening_port (self), NULL); + } + return TRUE; } diff --git a/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h index 12f732d..e3fbb13 100644 --- a/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h +++ b/ext/nnstreamer/tensor_sink/tensor_sink_grpc.h @@ -18,7 +18,7 @@ #include #include -#include +#include G_BEGIN_DECLS @@ -55,6 +55,7 @@ struct _GstTensorSinkGRPC gboolean server; /**< true to enable server mode */ gint port; /**< gRPC server port number */ gchar *host; /**< gRPC server host name */ + grpc_idl idl; /**< gRPC IDL for comm. */ guint out; /**< number of output messages */ /** Working variables */ diff --git a/ext/nnstreamer/tensor_source/meson.build b/ext/nnstreamer/tensor_source/meson.build index d76daef..524bf1d 100644 --- a/ext/nnstreamer/tensor_source/meson.build +++ b/ext/nnstreamer/tensor_source/meson.build @@ -27,6 +27,6 @@ if grpc_support_is_available tensor_src_grpc_dep = declare_dependency( sources : grpc_tensor_src_sources, - dependencies : grpc_protobuf_util_dep + dependencies : grpc_util_dep ) endif diff --git a/ext/nnstreamer/tensor_source/tensor_src_grpc.c b/ext/nnstreamer/tensor_source/tensor_src_grpc.c index 5f5c95c..a46020e 100644 --- a/ext/nnstreamer/tensor_source/tensor_src_grpc.c +++ b/ext/nnstreamer/tensor_source/tensor_src_grpc.c @@ -73,6 +73,11 @@ GST_DEBUG_CATEGORY_STATIC (gst_tensor_src_grpc_debug); #define DEFAULT_PROP_SERVER TRUE /** + * @brief Default IDL for RPC comm. + */ +#define DEFAULT_PROP_IDL "protobuf" + +/** * @brief Default host and port */ #define DEFAULT_PROP_HOST "localhost" @@ -94,6 +99,7 @@ enum PROP_0, PROP_SILENT, PROP_SERVER, + PROP_IDL, PROP_HOST, PROP_PORT, PROP_OUT, @@ -147,6 +153,11 @@ gst_tensor_src_grpc_class_init (GstTensorSrcGRPCClass * klass) "Specify its working mode either server or client", DEFAULT_PROP_SERVER, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_IDL, + g_param_spec_string ("idl", "IDL", + "Specify Interface Description Language (IDL) for communication", + DEFAULT_PROP_IDL, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + g_object_class_install_property (gobject_class, PROP_HOST, g_param_spec_string ("host", "Host", "The hostname to listen as or connect", @@ -215,6 +226,7 @@ gst_tensor_src_grpc_init (GstTensorSrcGRPC * self) { self->silent = DEFAULT_PROP_SILENT; self->server = DEFAULT_PROP_SERVER; + self->idl = grpc_get_idl (DEFAULT_PROP_IDL); self->port = DEFAULT_PROP_PORT; self->host = g_strdup (DEFAULT_PROP_HOST); self->priv = NULL; @@ -254,57 +266,12 @@ _send_eos_event (GstTensorSrcGRPC * self) } /** - * @brief free tensor memory - */ -static void -_free_memory (GstTensorMemory ** memory) -{ - guint i; - - for (i = 0; i < NNS_TENSOR_SIZE_LIMIT; i++) { - if (memory[i] == NULL) - break; - - g_free (memory[i]->data); - g_free (memory[i]); - } - - g_free (memory); -} - -/** - * @brief free tensor memory and set buffer - */ -static void -_free_memory_and_set_buffer (GstTensorMemory ** memory, GstBuffer ** buffer) -{ - GstBuffer *buf = gst_buffer_new (); - GstMemory *mem; - guint i; - - for (i = 0; i < NNS_TENSOR_SIZE_LIMIT; i++) { - if (memory[i] == NULL) - break; - - mem = gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, - memory[i]->data, memory[i]->size, 0, memory[i]->size, NULL, NULL); - gst_buffer_append_memory (buf, mem); - - g_free (memory[i]); - } - g_free (memory); - - *buffer = buf; -} - -/** * @brief callback function for gRPC requests */ static void _grpc_callback (void *obj, void *data) { GstTensorSrcGRPC *self; - GstTensorMemory **memory; GstBuffer *buffer; GstClockTime duration; @@ -315,13 +282,13 @@ _grpc_callback (void *obj, void *data) g_return_if_fail (data != NULL); self = GST_TENSOR_SRC_GRPC_CAST (obj); - memory = (GstTensorMemory **) data; + buffer = (GstBuffer *) data; GST_OBJECT_LOCK (self); if (!GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SRC_GRPC_STARTED) || !GST_OBJECT_FLAG_IS_SET (self, GST_TENSOR_SRC_GRPC_CONFIGURED)) { - _free_memory (memory); + gst_buffer_unref (buffer); GST_OBJECT_UNLOCK (self); return; @@ -335,8 +302,6 @@ _grpc_callback (void *obj, void *data) timestamp = 0; } - _free_memory_and_set_buffer (memory, &buffer); - GST_BUFFER_DURATION (buffer) = duration; GST_BUFFER_PTS (buffer) = timestamp; @@ -379,10 +344,14 @@ gst_tensor_src_grpc_start (GstBaseSrc * src) grpc_set_callback (self, _grpc_callback); - ret = grpc_start (self, GRPC_DIRECTION_FROM_PROTOBUF); - if (ret) + ret = grpc_start (self, GRPC_DIRECTION_BUFFER_TO_TENSORS); + if (ret) { GST_OBJECT_FLAG_SET (self, GST_TENSOR_SRC_GRPC_STARTED); + if (self->server) + g_object_set (self, "port", grpc_get_listening_port (self), NULL); + } + return ret; } @@ -508,6 +477,21 @@ gst_tensor_src_grpc_set_property (GObject * object, guint prop_id, self->server = g_value_get_boolean (value); silent_debug ("Set server = %d", self->server); break; + case PROP_IDL: + { + const gchar * idl_str = g_value_get_string (value); + + if (idl_str) { + grpc_idl idl = grpc_get_idl (idl_str); + if (idl != GRPC_IDL_NONE) { + self->idl = idl; + silent_debug ("Set idl = %s", idl_str); + } else { + ml_loge ("Invalid IDL string provided: %s", idl_str); + } + } + break; + } case PROP_HOST: { gchar * host; @@ -551,6 +535,18 @@ gst_tensor_src_grpc_get_property (GObject * object, guint prop_id, case PROP_SERVER: g_value_set_boolean (value, self->server); break; + case PROP_IDL: + switch (self->idl) { + case GRPC_IDL_PROTOBUF: + g_value_set_string (value, "protobuf"); + break; + case GRPC_IDL_FLATBUF: + g_value_set_string (value, "flatbuf"); + break; + default: + break; + } + break; case PROP_HOST: g_value_set_string (value, self->host); break; diff --git a/ext/nnstreamer/tensor_source/tensor_src_grpc.h b/ext/nnstreamer/tensor_source/tensor_src_grpc.h index cf5bab0..f6d3c69 100644 --- a/ext/nnstreamer/tensor_source/tensor_src_grpc.h +++ b/ext/nnstreamer/tensor_source/tensor_src_grpc.h @@ -19,7 +19,7 @@ #include #include -#include +#include G_BEGIN_DECLS #define GST_TYPE_TENSOR_SRC_GRPC \ @@ -54,9 +54,10 @@ struct _GstTensorSrcGRPC /** Properties saved */ gboolean silent; /**< true to print minimized log */ gboolean server; /**< true to enable server mode */ - gint port; /**< gRPC server port number */ - gchar *host; /**< gRPC server host name */ - guint out; + gint port; /**< gRPC server port number */ + gchar *host; /**< gRPC server host name */ + guint out; /**< number of output */ + grpc_idl idl; /**< gRPC IDL for comm. */ /** Working variables */ GstDataQueue *queue; /**< data queue to hold input data */ diff --git a/packaging/nnstreamer.spec b/packaging/nnstreamer.spec index 1b46eb6..d0193b9 100644 --- a/packaging/nnstreamer.spec +++ b/packaging/nnstreamer.spec @@ -904,7 +904,8 @@ cp -r result %{buildroot}%{_datadir}/nnstreamer/unittest/ %defattr(-,root,root,-) %manifest nnstreamer.manifest %license LICENSE -%{_libdir}/libnnstreamer_protobuf_grpc.so +%{_libdir}/libnnstreamer_grpc_protobuf.so +%{_libdir}/libnnstreamer_grpc_flatbuf.so %{gstlibdir}/libnnstreamer-grpc.so %endif # grpc_support -- 2.7.4