[Common/Adapter] aggregation from multi clients
authorJaeyun <jy1210.jung@samsung.com>
Fri, 27 Aug 2021 05:37:15 +0000 (14:37 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Mon, 6 Sep 2021 15:03:46 +0000 (00:03 +0900)
Use hashtable to aggregate buffers from multiple clients.
- create gst-adapter for each client id
- for normal case, set default adaptor (no meta in gst-buffer)

Signed-off-by: Jaeyun <jy1210.jung@samsung.com>
gst/nnstreamer/tensor_aggregator/tensor_aggregator.c
gst/nnstreamer/tensor_aggregator/tensor_aggregator.h
gst/nnstreamer/tensor_common.h
gst/nnstreamer/tensor_common_pipeline.c
gst/nnstreamer/tensor_converter/tensor_converter.c
gst/nnstreamer/tensor_converter/tensor_converter.h

index 9aab75e..accb2e6 100644 (file)
@@ -32,6 +32,7 @@
 
 #include <string.h>
 #include "tensor_aggregator.h"
+#include "tensor_meta.h"
 #include <nnstreamer_util.h>
 
 /**
@@ -282,7 +283,7 @@ gst_tensor_aggregator_init (GstTensorAggregator * self)
   gst_tensors_config_init (&self->in_config);
   gst_tensors_config_init (&self->out_config);
 
-  self->adapter = gst_adapter_new ();
+  self->adapter_table = gst_tensor_aggregation_init ();
   gst_tensor_aggregator_reset (self);
 }
 
@@ -300,11 +301,7 @@ gst_tensor_aggregator_finalize (GObject * object)
 
   gst_tensors_config_free (&self->in_config);
   gst_tensors_config_free (&self->out_config);
-
-  if (self->adapter) {
-    g_object_unref (self->adapter);
-    self->adapter = NULL;
-  }
+  g_hash_table_destroy (self->adapter_table);
 
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
@@ -518,6 +515,22 @@ gst_tensor_aggregator_src_query (GstPad * pad, GstObject * parent,
 }
 
 /**
+ * @brief Internal function to get adapter.
+ */
+static GstAdapter *
+gst_tensor_aggregator_get_adapter (GstTensorAggregator * self, GstBuffer * buf)
+{
+  GstMetaQuery *meta;
+  const gchar *key = NULL;
+
+  meta = gst_buffer_get_meta_query (buf);
+  if (meta)
+    key = meta->host;
+
+  return gst_tensor_aggregation_get_adapter (self->adapter_table, key);
+}
+
+/**
  * @brief Check tensor dimension and axis to concatenate data.
  * @param self this pointer to GstTensorAggregator
  * @param info tensor info for one frame
@@ -845,7 +858,7 @@ gst_tensor_aggregator_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
     return gst_tensor_aggregator_push (self, buf, frame_size);
   }
 
-  adapter = self->adapter;
+  adapter = gst_tensor_aggregator_get_adapter (self, buf);
   g_assert (adapter != NULL);
 
   duration = GST_BUFFER_DURATION (buf);
@@ -967,9 +980,7 @@ static void
 gst_tensor_aggregator_reset (GstTensorAggregator * self)
 {
   /* remove all buffers from adapter */
-  if (self->adapter) {
-    gst_adapter_clear (self->adapter);
-  }
+  gst_tensor_aggregation_clear_all (self->adapter_table);
 }
 
 /**
index cfbe3f5..bf39394 100644 (file)
@@ -29,7 +29,6 @@
 
 #include <gst/gst.h>
 #include <tensor_common.h>
-#include <gst/base/gstadapter.h>
 
 G_BEGIN_DECLS
 
@@ -64,7 +63,7 @@ struct _GstTensorAggregator
   guint frames_flush; /**< number of frames to flush */
   guint frames_dim; /**< index of frames in tensor dimension */
 
-  GstAdapter *adapter; /**< adapt incoming tensor */
+  GHashTable *adapter_table; /**< adapt incoming tensor */
 
   gboolean tensor_configured; /**< True if already successfully configured tensor metadata */
   GstTensorsConfig in_config; /**< input tensor info */
index a9fa69f..235cc04 100644 (file)
@@ -32,6 +32,7 @@
 #include <glib.h>
 #include <stdint.h>
 #include <gst/gst.h>
+#include <gst/base/gstadapter.h>
 #include <gst/base/gstcollectpads.h>
 
 #include "tensor_typedef.h"
@@ -195,6 +196,37 @@ gst_tensor_pad_possible_caps_from_config (GstPad * pad, const GstTensorsConfig *
 extern gboolean
 gst_tensor_pad_caps_is_flexible (GstPad * pad);
 
+/**
+ * @brief Gets new hash table for tensor aggregation.
+ * @return Newly allocated hash table, caller should release this using g_hash_table_destroy().
+ */
+extern GHashTable *
+gst_tensor_aggregation_init (void);
+
+/**
+ * @brief Clears buffers from adapter.
+ * @param table a hash table instance initialized with gst_tensor_aggregation_init()
+ * @param key the key to look up (set null to get default adapter)
+ */
+extern void
+gst_tensor_aggregation_clear (GHashTable * table, const gchar * key);
+
+/**
+ * @brief Clears buffers from all adapters in hash table.
+ * @param table a hash table instance initialized with gst_tensor_aggregation_init()
+ */
+extern void
+gst_tensor_aggregation_clear_all (GHashTable * table);
+
+/**
+ * @brief Gets adapter from hash table.
+ * @param table a hash table instance initialized with gst_tensor_aggregation_init()
+ * @param key the key to look up (set null to get default adapter)
+ * @return gst-adapter instance. DO NOT release this instance.
+ */
+extern GstAdapter *
+gst_tensor_aggregation_get_adapter (GHashTable * table, const gchar * key);
+
 /******************************************************
  ************ Commonly used debugging macros **********
  ******************************************************
index 0a0d87e..63a6820 100644 (file)
@@ -13,8 +13,9 @@
  *
  */
 
-#include <tensor_common.h>
+#include <nnstreamer_util.h>
 #include <string.h>
+#include <tensor_common.h>
 
 static const gchar *gst_tensor_time_sync_mode_string[] = {
   [SYNC_NOSYNC] = "nosync",
@@ -443,3 +444,157 @@ gst_tensor_time_sync_buffer_from_collectpad (GstCollectPads * collect,
   *is_eos = _gst_tensor_time_sync_is_eos (collect, sync, empty_pad);
   return !(*is_eos);
 }
+
+/**
+ * @brief Internal struct to handle aggregation data in hash table.
+ */
+typedef struct
+{
+  GstAdapter *adapter;
+} gst_tensor_aggregation_data_s;
+
+#define AGGREGATION_DEFAULT_KEY "aggr-default"
+#define AGGREGATION_KEY_IS_EMPTY(k) (k == NULL || k[0] == '\0')
+
+/**
+ * @brief Internal function to free aggregation data.
+ */
+static void
+gst_tensor_aggregation_free_data (gpointer data)
+{
+  gst_tensor_aggregation_data_s *aggr;
+
+  aggr = (gst_tensor_aggregation_data_s *) data;
+  if (aggr) {
+    gst_adapter_clear (aggr->adapter);
+    g_object_unref (aggr->adapter);
+
+    g_free (aggr);
+  }
+}
+
+/**
+ * @brief Internal function to add new aggregation data.
+ */
+static gst_tensor_aggregation_data_s *
+gst_tensor_aggregation_add_data (GHashTable * table, const gchar * key)
+{
+  gst_tensor_aggregation_data_s *aggr;
+  gchar *hashkey;
+
+  g_return_val_if_fail (table != NULL, NULL);
+
+  if (AGGREGATION_KEY_IS_EMPTY (key))
+    hashkey = g_strdup (AGGREGATION_DEFAULT_KEY);
+  else
+    hashkey = g_strdup (key);
+
+  aggr = g_new0 (gst_tensor_aggregation_data_s, 1);
+  aggr->adapter = gst_adapter_new ();
+
+  g_hash_table_insert (table, hashkey, aggr);
+  return aggr;
+}
+
+/**
+ * @brief Internal function to get aggregation data.
+ */
+static gst_tensor_aggregation_data_s *
+gst_tensor_aggregation_get_data (GHashTable * table, const gchar * key)
+{
+  g_return_val_if_fail (table != NULL, NULL);
+
+  return (gst_tensor_aggregation_data_s *) g_hash_table_lookup (table,
+      AGGREGATION_KEY_IS_EMPTY (key) ? AGGREGATION_DEFAULT_KEY : key);
+}
+
+/**
+ * @brief Internal function to remove all buffers from aggregation data.
+ */
+static void
+gst_tensor_aggregation_clear_internal (gpointer key, gpointer value,
+    gpointer user_data)
+{
+  gst_tensor_aggregation_data_s *aggr;
+
+  UNUSED (key);
+  UNUSED (user_data);
+
+  aggr = (gst_tensor_aggregation_data_s *) value;
+  if (aggr) {
+    gst_adapter_clear (aggr->adapter);
+  }
+}
+
+/**
+ * @brief Gets new hash table for tensor aggregation.
+ * @return Newly allocated hash table, caller should release this using g_hash_table_destroy().
+ */
+GHashTable *
+gst_tensor_aggregation_init (void)
+{
+  GHashTable *table;
+
+  table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free,
+      gst_tensor_aggregation_free_data);
+
+  /**
+   * Add default adapter (for the case if buffer has no specific id).
+   * If gst-buffer has tensor-meta which includes client-id,
+   * e.g., aggregation frames from multiple clients on query-server pipeline,
+   * nnstreamer element should parse meta and request adapter with this id.
+   * However, on normal pipeline, gst-buffer does not contain tensor-meta,
+   * then the element may request adapter with null key string.
+   */
+  gst_tensor_aggregation_add_data (table, AGGREGATION_DEFAULT_KEY);
+
+  return table;
+}
+
+/**
+ * @brief Clears buffers from adapter.
+ * @param table a hash table instance initialized with gst_tensor_aggregation_init()
+ * @param key the key to look up (set null to get default adapter)
+ */
+void
+gst_tensor_aggregation_clear (GHashTable * table, const gchar * key)
+{
+  gst_tensor_aggregation_data_s *aggr;
+
+  g_return_if_fail (table != NULL);
+
+  aggr = gst_tensor_aggregation_get_data (table, key);
+  gst_tensor_aggregation_clear_internal (NULL, aggr, NULL);
+}
+
+/**
+ * @brief Clears buffers from all adapters in hash table.
+ * @param table a hash table instance initialized with gst_tensor_aggregation_init()
+ */
+void
+gst_tensor_aggregation_clear_all (GHashTable * table)
+{
+  g_hash_table_foreach (table, gst_tensor_aggregation_clear_internal, NULL);
+}
+
+/**
+ * @brief Gets adapter from hash table.
+ * @param table a hash table instance initialized with gst_tensor_aggregation_init()
+ * @param key the key to look up (set null to get default adapter)
+ * @return gst-adapter instance. DO NOT release this instance.
+ */
+GstAdapter *
+gst_tensor_aggregation_get_adapter (GHashTable * table, const gchar * key)
+{
+  gst_tensor_aggregation_data_s *aggr;
+
+  g_return_val_if_fail (table != NULL, NULL);
+
+  aggr = gst_tensor_aggregation_get_data (table, key);
+  if (!aggr) {
+    /*append new data */
+    aggr = gst_tensor_aggregation_add_data (table, key);
+  }
+
+  return aggr->adapter;
+}
index b2660f0..68efbfc 100644 (file)
@@ -46,6 +46,7 @@
 
 #include <string.h>
 #include "tensor_converter.h"
+#include "tensor_meta.h"
 
 #ifdef NO_VIDEO
 #include "converter-media-info-no-video.h"
@@ -362,7 +363,7 @@ gst_tensor_converter_init (GstTensorConverter * self)
   gst_tensors_config_init (&self->tensors_config);
   self->tensors_configured = FALSE;
 
-  self->adapter = gst_adapter_new ();
+  self->adapter_table = gst_tensor_aggregation_init ();
   gst_tensor_converter_reset (self);
 }
 
@@ -380,11 +381,7 @@ gst_tensor_converter_finalize (GObject * object)
 
   gst_tensors_config_free (&self->tensors_config);
   gst_tensors_info_free (&self->tensors_info);
-
-  if (self->adapter) {
-    g_object_unref (self->adapter);
-    self->adapter = NULL;
-  }
+  g_hash_table_destroy (self->adapter_table);
 
   g_free (self->mode_option);
   g_free (self->ext_fw);
@@ -690,6 +687,22 @@ gst_tensor_converter_sink_query (GstPad * pad, GstObject * parent,
 }
 
 /**
+ * @brief Internal function to get adapter.
+ */
+static GstAdapter *
+gst_tensor_converter_get_adapter (GstTensorConverter * self, GstBuffer * buf)
+{
+  GstMetaQuery *meta;
+  const gchar *key = NULL;
+
+  meta = gst_buffer_get_meta_query (buf);
+  if (meta)
+    key = meta->host;
+
+  return gst_tensor_aggregation_get_adapter (self->adapter_table, key);
+}
+
+/**
  * @brief This function handles src pad query.
  */
 static gboolean
@@ -926,7 +939,7 @@ _gst_tensor_converter_chain_chunk (GstTensorConverter * self,
   gboolean have_framerate;
 
   config = &self->tensors_config;
-  adapter = self->adapter;
+  adapter = gst_tensor_converter_get_adapter (self, inbuf);
   g_assert (adapter != NULL);
 
   have_framerate = (config->rate_n > 0 && config->rate_d > 0);
@@ -1268,9 +1281,7 @@ static void
 gst_tensor_converter_reset (GstTensorConverter * self)
 {
   /* remove all buffers from adapter */
-  if (self->adapter) {
-    gst_adapter_clear (self->adapter);
-  }
+  gst_tensor_aggregation_clear_all (self->adapter_table);
 
   self->have_segment = FALSE;
   self->need_segment = FALSE;
index be28a5d..f6f3f2e 100644 (file)
@@ -35,7 +35,6 @@
 #define __GST_TENSOR_CONVERTER_H__
 
 #include <gst/gst.h>
-#include <gst/base/gstadapter.h>
 #include <tensor_common.h>
 #include "nnstreamer_plugin_api_converter.h"
 #include "tensor_converter_custom.h"
@@ -85,7 +84,7 @@ struct _GstTensorConverter
   guint frames_per_tensor; /**< number of frames in output tensor */
   GstTensorsInfo tensors_info; /**< data structure to get/set tensor info */
 
-  GstAdapter *adapter; /**< adapt incoming media stream */
+  GHashTable *adapter_table; /**< adapt incoming media stream */
 
   media_type in_media_type; /**< incoming media type */
   /** ExternalConverter is used if in_media_type == _NNS_MEDIA_PLUGINS */