From: Jaeyun Date: Fri, 27 Aug 2021 05:37:15 +0000 (+0900) Subject: [Common/Adapter] aggregation from multi clients X-Git-Tag: accepted/tizen/unified/20210915.100110~14 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=450f2b050cf468aeabc38ef9c32bdda0908de4f2;p=platform%2Fupstream%2Fnnstreamer.git [Common/Adapter] aggregation from multi clients Use hashtable to aggregate buffers from multiple clients. - create gst-adapter for each client id - for normal case, set default adaptor (no meta in gst-buffer) Signed-off-by: Jaeyun --- diff --git a/gst/nnstreamer/tensor_aggregator/tensor_aggregator.c b/gst/nnstreamer/tensor_aggregator/tensor_aggregator.c index 9aab75e..accb2e6 100644 --- a/gst/nnstreamer/tensor_aggregator/tensor_aggregator.c +++ b/gst/nnstreamer/tensor_aggregator/tensor_aggregator.c @@ -32,6 +32,7 @@ #include #include "tensor_aggregator.h" +#include "tensor_meta.h" #include /** @@ -282,7 +283,7 @@ gst_tensor_aggregator_init (GstTensorAggregator * self) gst_tensors_config_init (&self->in_config); gst_tensors_config_init (&self->out_config); - self->adapter = gst_adapter_new (); + self->adapter_table = gst_tensor_aggregation_init (); gst_tensor_aggregator_reset (self); } @@ -300,11 +301,7 @@ gst_tensor_aggregator_finalize (GObject * object) gst_tensors_config_free (&self->in_config); gst_tensors_config_free (&self->out_config); - - if (self->adapter) { - g_object_unref (self->adapter); - self->adapter = NULL; - } + g_hash_table_destroy (self->adapter_table); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -518,6 +515,22 @@ gst_tensor_aggregator_src_query (GstPad * pad, GstObject * parent, } /** + * @brief Internal function to get adapter. + */ +static GstAdapter * +gst_tensor_aggregator_get_adapter (GstTensorAggregator * self, GstBuffer * buf) +{ + GstMetaQuery *meta; + const gchar *key = NULL; + + meta = gst_buffer_get_meta_query (buf); + if (meta) + key = meta->host; + + return gst_tensor_aggregation_get_adapter (self->adapter_table, key); +} + +/** * @brief Check tensor dimension and axis to concatenate data. * @param self this pointer to GstTensorAggregator * @param info tensor info for one frame @@ -845,7 +858,7 @@ gst_tensor_aggregator_chain (GstPad * pad, GstObject * parent, GstBuffer * buf) return gst_tensor_aggregator_push (self, buf, frame_size); } - adapter = self->adapter; + adapter = gst_tensor_aggregator_get_adapter (self, buf); g_assert (adapter != NULL); duration = GST_BUFFER_DURATION (buf); @@ -967,9 +980,7 @@ static void gst_tensor_aggregator_reset (GstTensorAggregator * self) { /* remove all buffers from adapter */ - if (self->adapter) { - gst_adapter_clear (self->adapter); - } + gst_tensor_aggregation_clear_all (self->adapter_table); } /** diff --git a/gst/nnstreamer/tensor_aggregator/tensor_aggregator.h b/gst/nnstreamer/tensor_aggregator/tensor_aggregator.h index cfbe3f5..bf39394 100644 --- a/gst/nnstreamer/tensor_aggregator/tensor_aggregator.h +++ b/gst/nnstreamer/tensor_aggregator/tensor_aggregator.h @@ -29,7 +29,6 @@ #include #include -#include G_BEGIN_DECLS @@ -64,7 +63,7 @@ struct _GstTensorAggregator guint frames_flush; /**< number of frames to flush */ guint frames_dim; /**< index of frames in tensor dimension */ - GstAdapter *adapter; /**< adapt incoming tensor */ + GHashTable *adapter_table; /**< adapt incoming tensor */ gboolean tensor_configured; /**< True if already successfully configured tensor metadata */ GstTensorsConfig in_config; /**< input tensor info */ diff --git a/gst/nnstreamer/tensor_common.h b/gst/nnstreamer/tensor_common.h index a9fa69f..235cc04 100644 --- a/gst/nnstreamer/tensor_common.h +++ b/gst/nnstreamer/tensor_common.h @@ -32,6 +32,7 @@ #include #include #include +#include #include #include "tensor_typedef.h" @@ -195,6 +196,37 @@ gst_tensor_pad_possible_caps_from_config (GstPad * pad, const GstTensorsConfig * extern gboolean gst_tensor_pad_caps_is_flexible (GstPad * pad); +/** + * @brief Gets new hash table for tensor aggregation. + * @return Newly allocated hash table, caller should release this using g_hash_table_destroy(). + */ +extern GHashTable * +gst_tensor_aggregation_init (void); + +/** + * @brief Clears buffers from adapter. + * @param table a hash table instance initialized with gst_tensor_aggregation_init() + * @param key the key to look up (set null to get default adapter) + */ +extern void +gst_tensor_aggregation_clear (GHashTable * table, const gchar * key); + +/** + * @brief Clears buffers from all adapters in hash table. + * @param table a hash table instance initialized with gst_tensor_aggregation_init() + */ +extern void +gst_tensor_aggregation_clear_all (GHashTable * table); + +/** + * @brief Gets adapter from hash table. + * @param table a hash table instance initialized with gst_tensor_aggregation_init() + * @param key the key to look up (set null to get default adapter) + * @return gst-adapter instance. DO NOT release this instance. + */ +extern GstAdapter * +gst_tensor_aggregation_get_adapter (GHashTable * table, const gchar * key); + /****************************************************** ************ Commonly used debugging macros ********** ****************************************************** diff --git a/gst/nnstreamer/tensor_common_pipeline.c b/gst/nnstreamer/tensor_common_pipeline.c index 0a0d87e..63a6820 100644 --- a/gst/nnstreamer/tensor_common_pipeline.c +++ b/gst/nnstreamer/tensor_common_pipeline.c @@ -13,8 +13,9 @@ * */ -#include +#include #include +#include static const gchar *gst_tensor_time_sync_mode_string[] = { [SYNC_NOSYNC] = "nosync", @@ -443,3 +444,157 @@ gst_tensor_time_sync_buffer_from_collectpad (GstCollectPads * collect, *is_eos = _gst_tensor_time_sync_is_eos (collect, sync, empty_pad); return !(*is_eos); } + +/** + * @brief Internal struct to handle aggregation data in hash table. + */ +typedef struct +{ + GstAdapter *adapter; +} gst_tensor_aggregation_data_s; + +#define AGGREGATION_DEFAULT_KEY "aggr-default" +#define AGGREGATION_KEY_IS_EMPTY(k) (k == NULL || k[0] == '\0') + +/** + * @brief Internal function to free aggregation data. + */ +static void +gst_tensor_aggregation_free_data (gpointer data) +{ + gst_tensor_aggregation_data_s *aggr; + + aggr = (gst_tensor_aggregation_data_s *) data; + if (aggr) { + gst_adapter_clear (aggr->adapter); + g_object_unref (aggr->adapter); + + g_free (aggr); + } +} + +/** + * @brief Internal function to add new aggregation data. + */ +static gst_tensor_aggregation_data_s * +gst_tensor_aggregation_add_data (GHashTable * table, const gchar * key) +{ + gst_tensor_aggregation_data_s *aggr; + gchar *hashkey; + + g_return_val_if_fail (table != NULL, NULL); + + if (AGGREGATION_KEY_IS_EMPTY (key)) + hashkey = g_strdup (AGGREGATION_DEFAULT_KEY); + else + hashkey = g_strdup (key); + + aggr = g_new0 (gst_tensor_aggregation_data_s, 1); + aggr->adapter = gst_adapter_new (); + + g_hash_table_insert (table, hashkey, aggr); + return aggr; +} + +/** + * @brief Internal function to get aggregation data. + */ +static gst_tensor_aggregation_data_s * +gst_tensor_aggregation_get_data (GHashTable * table, const gchar * key) +{ + g_return_val_if_fail (table != NULL, NULL); + + return (gst_tensor_aggregation_data_s *) g_hash_table_lookup (table, + AGGREGATION_KEY_IS_EMPTY (key) ? AGGREGATION_DEFAULT_KEY : key); +} + +/** + * @brief Internal function to remove all buffers from aggregation data. + */ +static void +gst_tensor_aggregation_clear_internal (gpointer key, gpointer value, + gpointer user_data) +{ + gst_tensor_aggregation_data_s *aggr; + + UNUSED (key); + UNUSED (user_data); + + aggr = (gst_tensor_aggregation_data_s *) value; + if (aggr) { + gst_adapter_clear (aggr->adapter); + } +} + +/** + * @brief Gets new hash table for tensor aggregation. + * @return Newly allocated hash table, caller should release this using g_hash_table_destroy(). + */ +GHashTable * +gst_tensor_aggregation_init (void) +{ + GHashTable *table; + + table = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, + gst_tensor_aggregation_free_data); + + /** + * Add default adapter (for the case if buffer has no specific id). + * If gst-buffer has tensor-meta which includes client-id, + * e.g., aggregation frames from multiple clients on query-server pipeline, + * nnstreamer element should parse meta and request adapter with this id. + * However, on normal pipeline, gst-buffer does not contain tensor-meta, + * then the element may request adapter with null key string. + */ + gst_tensor_aggregation_add_data (table, AGGREGATION_DEFAULT_KEY); + + return table; +} + +/** + * @brief Clears buffers from adapter. + * @param table a hash table instance initialized with gst_tensor_aggregation_init() + * @param key the key to look up (set null to get default adapter) + */ +void +gst_tensor_aggregation_clear (GHashTable * table, const gchar * key) +{ + gst_tensor_aggregation_data_s *aggr; + + g_return_if_fail (table != NULL); + + aggr = gst_tensor_aggregation_get_data (table, key); + gst_tensor_aggregation_clear_internal (NULL, aggr, NULL); +} + +/** + * @brief Clears buffers from all adapters in hash table. + * @param table a hash table instance initialized with gst_tensor_aggregation_init() + */ +void +gst_tensor_aggregation_clear_all (GHashTable * table) +{ + g_hash_table_foreach (table, gst_tensor_aggregation_clear_internal, NULL); +} + +/** + * @brief Gets adapter from hash table. + * @param table a hash table instance initialized with gst_tensor_aggregation_init() + * @param key the key to look up (set null to get default adapter) + * @return gst-adapter instance. DO NOT release this instance. + */ +GstAdapter * +gst_tensor_aggregation_get_adapter (GHashTable * table, const gchar * key) +{ + gst_tensor_aggregation_data_s *aggr; + + g_return_val_if_fail (table != NULL, NULL); + + aggr = gst_tensor_aggregation_get_data (table, key); + if (!aggr) { + /*append new data */ + aggr = gst_tensor_aggregation_add_data (table, key); + } + + return aggr->adapter; +} diff --git a/gst/nnstreamer/tensor_converter/tensor_converter.c b/gst/nnstreamer/tensor_converter/tensor_converter.c index b2660f0..68efbfc 100644 --- a/gst/nnstreamer/tensor_converter/tensor_converter.c +++ b/gst/nnstreamer/tensor_converter/tensor_converter.c @@ -46,6 +46,7 @@ #include #include "tensor_converter.h" +#include "tensor_meta.h" #ifdef NO_VIDEO #include "converter-media-info-no-video.h" @@ -362,7 +363,7 @@ gst_tensor_converter_init (GstTensorConverter * self) gst_tensors_config_init (&self->tensors_config); self->tensors_configured = FALSE; - self->adapter = gst_adapter_new (); + self->adapter_table = gst_tensor_aggregation_init (); gst_tensor_converter_reset (self); } @@ -380,11 +381,7 @@ gst_tensor_converter_finalize (GObject * object) gst_tensors_config_free (&self->tensors_config); gst_tensors_info_free (&self->tensors_info); - - if (self->adapter) { - g_object_unref (self->adapter); - self->adapter = NULL; - } + g_hash_table_destroy (self->adapter_table); g_free (self->mode_option); g_free (self->ext_fw); @@ -690,6 +687,22 @@ gst_tensor_converter_sink_query (GstPad * pad, GstObject * parent, } /** + * @brief Internal function to get adapter. + */ +static GstAdapter * +gst_tensor_converter_get_adapter (GstTensorConverter * self, GstBuffer * buf) +{ + GstMetaQuery *meta; + const gchar *key = NULL; + + meta = gst_buffer_get_meta_query (buf); + if (meta) + key = meta->host; + + return gst_tensor_aggregation_get_adapter (self->adapter_table, key); +} + +/** * @brief This function handles src pad query. */ static gboolean @@ -926,7 +939,7 @@ _gst_tensor_converter_chain_chunk (GstTensorConverter * self, gboolean have_framerate; config = &self->tensors_config; - adapter = self->adapter; + adapter = gst_tensor_converter_get_adapter (self, inbuf); g_assert (adapter != NULL); have_framerate = (config->rate_n > 0 && config->rate_d > 0); @@ -1268,9 +1281,7 @@ static void gst_tensor_converter_reset (GstTensorConverter * self) { /* remove all buffers from adapter */ - if (self->adapter) { - gst_adapter_clear (self->adapter); - } + gst_tensor_aggregation_clear_all (self->adapter_table); self->have_segment = FALSE; self->need_segment = FALSE; diff --git a/gst/nnstreamer/tensor_converter/tensor_converter.h b/gst/nnstreamer/tensor_converter/tensor_converter.h index be28a5d..f6f3f2e 100644 --- a/gst/nnstreamer/tensor_converter/tensor_converter.h +++ b/gst/nnstreamer/tensor_converter/tensor_converter.h @@ -35,7 +35,6 @@ #define __GST_TENSOR_CONVERTER_H__ #include -#include #include #include "nnstreamer_plugin_api_converter.h" #include "tensor_converter_custom.h" @@ -85,7 +84,7 @@ struct _GstTensorConverter guint frames_per_tensor; /**< number of frames in output tensor */ GstTensorsInfo tensors_info; /**< data structure to get/set tensor info */ - GstAdapter *adapter; /**< adapt incoming media stream */ + GHashTable *adapter_table; /**< adapt incoming media stream */ media_type in_media_type; /**< incoming media type */ /** ExternalConverter is used if in_media_type == _NNS_MEDIA_PLUGINS */