[nnstreamer][trainer] Create a queue to construct dataset
authorhyunil park <hyunil46.park@samsung.com>
Thu, 5 Jan 2023 02:52:02 +0000 (11:52 +0900)
committerJijoong Moon <jijoong.moon@samsung.com>
Wed, 22 Mar 2023 10:17:14 +0000 (19:17 +0900)
- It is dynamically allocated as much as initial queue size and
  continues to use the memory until training is finished.
- The maximum size of the queue is 30, and if the number of samples
  is less than 30, size is number of samples.
- Data received through 'invoke' is strored in queue, and the data
  in the queue is used in the 'data gen callback'. the used data
  is not maintained for epochs.
  For epoch, (number of trin samples + number of valid samples) * epoch
  data should be received.

Signed-off-by: hyunil park <hyunil46.park@samsung.com>
nnstreamer/tensor_trainer/tensor_trainer_nntrainer.cc
nnstreamer/tensor_trainer/tensor_trainer_nntrainer.hh

index 130afcbd89cba912b47f457feb74a6f5831dd30c..31c2a3f99567c48bae544c196772ece85387aae3 100644 (file)
@@ -40,6 +40,13 @@ void nntrainer_thread_func(NNTrainer::NNTrainerTrain *nntrainer) {
   nntrainer->trainModel();
 }
 
+/**
+ * @brief invoke function
+ * tensor_trainer call this function to send tensor_data.
+ * For epoch, (number of trin samples + number of valid samples) * epoch
+ * data should be received.
+ * Sub-plugin don't keep dataset for epoch.
+ */
 static int nntrainer_model_invoke(const GstTensorTrainerFramework *fw,
                                   const GstTensorTrainerProperties *prop,
                                   void *private_data,
@@ -48,18 +55,30 @@ static int nntrainer_model_invoke(const GstTensorTrainerFramework *fw,
   NNTrainer::NNTrainerTrain *nntrainer =
     reinterpret_cast<NNTrainer::NNTrainerTrain *>(private_data);
   UNUSED(fw);
+  UNUSED(prop);
+  pid_t pid = getpid();
+  pid_t tid = syscall(SYS_gettid);
+
   ml_logd("<called>");
+  ml_logd("pid[%d], tid[%d]", pid, tid);
 
   if (!nntrainer) {
     ml_loge("Failed get nntrainer");
     return -1;
   }
-  UNUSED(prop);
+
+  nntrainer->num_invoke++;
+  ml_logd("Received data (%d/%d)", nntrainer->num_invoke,
+          nntrainer->total_num_samples);
+  if (nntrainer->total_num_samples < nntrainer->num_invoke) {
+    ml_logd("Already received all data required for the train, "
+            "but invoke is called");
+    return 0;
+  }
 
   if (nntrainer->train_data->push_count == nntrainer->num_train_samples &&
       nntrainer->valid_data->push_count == nntrainer->num_valid_samples) {
-    ml_logd("data is full");
-    return 0;
+    nntrainer->train_data->push_count = nntrainer->valid_data->push_count = 0;
   }
 
   if (nntrainer->train_data->push_count < nntrainer->num_train_samples) {
@@ -74,47 +93,47 @@ static int nntrainer_model_invoke(const GstTensorTrainerFramework *fw,
           nntrainer->num_labels);
 
   NNTrainer::TensorData tensor_data;
-  int64_t idx = 0, i = 0;
-  char *p_data = nullptr;
-  for (i = 0; i < data->num_inputs; i++) {
-    ml_logd("input[%d]:%p, size:%zd\n", i, input[i].data, input[i].size);
-    p_data = new char[input[idx].size];
-    std::memcpy(p_data, input[idx].data, input[idx].size);
-    tensor_data.inputs.emplace_back(p_data);
-    ml_logd("input[%d].data = p %p\n", idx, (input[idx].data));
-    ml_logd("tensor_data.inputs[%d] = %p\n", idx, tensor_data.inputs[idx]);
+  unsigned int idx = 0, i = 0;
+
+  i = data->queue_rear;
+  ml_logd("Insert, queue_rear : %d", i);
+  for (auto inputs : data->tensor_data[i].inputs) {
+    ml_logd("inputs : %p", inputs);
+    ml_logd("input[%d]:%p, size:%zd\n", idx, input[idx].data, input[idx].size);
+    std::memcpy(inputs, input[idx].data, input[idx].size);
     idx++;
   }
-  for (i = 0; i < data->num_labels; i++) {
-    p_data = new char[input[idx].size];
-    std::memcpy(p_data, input[idx].data, input[idx].size);
-    tensor_data.labels.emplace_back(p_data);
+
+  for (auto labels : data->tensor_data[i].labels) {
+    ml_logd("labels : %p", labels);
+    ml_logd("input[%d]:%p, size:%zd\n", idx, input[idx].data, input[idx].size);
+    std::memcpy(labels, input[idx].data, input[idx].size);
     idx++;
   }
 
-  data->tensor_data.emplace_back(tensor_data);
+  data->queue_rear++;
   data->push_count++;
+  ml_logd("front:%d, rear:%d", data->queue_front, data->queue_rear);
 
-  ml_logd("(pop/push: %d/%d)", data->pop_count, data->push_count);
-
-#if 0
-  for (auto data : data->tensor_data) {
-    for (auto inputs : data.inputs) {
-      ml_logd("##I addr:%p", inputs);
-    }
-    for (auto labels : data.labels) {
-      ml_logd("##L addr:%p", labels);
-    }
+  if (data->is_data_wait && data->queue_rear > data->queue_front) {
+    pthread_mutex_lock(&data->mutex);
+    ml_logd("send signal");
+    pthread_cond_signal(&data->data_wait_cond);
+    pthread_mutex_unlock(&data->mutex);
   }
-#endif
 
-  if (data->is_mutex_locked && data->push_count > data->pop_count) {
+  if (data->queue_rear == data->queue_size) {
     pthread_mutex_lock(&data->mutex);
-    ml_logd("send signal");
-    pthread_cond_signal(&data->cond);
+    data->is_data_full = TRUE;
+    ml_logd("locked, data is full");
+    pthread_cond_wait(&data->data_full_cond, &data->mutex);
+    ml_logd("unlocked, queue is empty");
     pthread_mutex_unlock(&data->mutex);
+    data->is_data_full = FALSE;
+    data->queue_rear = data->queue_front = 0;
   }
 
+  ml_logd("(pop/push: %d/%d)", data->pop_count, data->push_count);
   ml_logd("T-pushed:%d/%d, V-pushed:%d/%d\n", nntrainer->train_data->push_count,
           nntrainer->num_train_samples, nntrainer->valid_data->push_count,
           nntrainer->num_valid_samples);
@@ -135,21 +154,22 @@ int getSample(float **input, float **label, bool *last, void *user_data) {
   ml_logd("<called>");
   ml_logd("pid[%d], tid[%d]", pid, tid);
 
-  if (data->push_count <= data->pop_count) {
+  ml_logd("front:%d, rear:%d", data->queue_front, data->queue_rear);
+  if (data->queue_rear <= data->queue_front) {
     pthread_mutex_lock(&data->mutex);
-    data->is_mutex_locked = TRUE;
+    data->is_data_wait = TRUE;
     ml_logd("locked, need to wait for more data");
-    pthread_cond_wait(&data->cond, &data->mutex);
+    pthread_cond_wait(&data->data_wait_cond, &data->mutex);
     ml_logd("unlocked, get data");
     pthread_mutex_unlock(&data->mutex);
-    data->is_mutex_locked = FALSE;
+    data->is_data_wait = FALSE;
   }
 
   ml_logd("num_inputs: %d, num_labels: %d", data->num_inputs, data->num_labels);
 
-  int64_t i = 0;
-  int idx = data->pop_count;
-  ml_logd("pop idx: %d", idx);
+  unsigned int i = 0;
+  unsigned int idx = data->queue_front;
+  ml_logd("Delete, queue_front: %d", idx);
 
   for (i = 0; i < data->num_inputs; i++) {
     ml_logd("memcpy Addr %p, %p, size=%d\n", *(input + i),
@@ -165,10 +185,11 @@ int getSample(float **input, float **label, bool *last, void *user_data) {
   }
 
   data->pop_count++;
+  data->queue_front++;
 
   ml_logd("(pop/push: %d/%d)", data->pop_count, data->push_count);
 
-  if (data->pop_count < data->num_samples) {
+  if (data->pop_count < data->num_samples) { // train or valid num samples
     *last = false;
   } else {
     *last = true;
@@ -179,6 +200,13 @@ int getSample(float **input, float **label, bool *last, void *user_data) {
     std::shuffle(data->tensor_data.begin(), data->tensor_data.end(), g);
   }
 
+  if (data->is_data_full && data->queue_rear == data->queue_front) {
+    pthread_mutex_lock(&data->mutex);
+    ml_logd("send signal");
+    pthread_cond_signal(&data->data_full_cond);
+    pthread_mutex_unlock(&data->mutex);
+  }
+
   ml_logd("<leave>");
 
   return 0;
@@ -208,7 +236,10 @@ NNTrainer::InputTensorsInfo::InputTensorsInfo(int64_t _num_samples,
                                               int64_t _num_inputs,
                                               int64_t _num_labels,
                                               int64_t _tensors_inputsize[]) :
-  is_mutex_locked(0),
+  is_data_wait(0),
+  is_data_full(0),
+  queue_front(0),
+  queue_rear(0),
   push_count(0),
   pop_count(0),
   num_samples(_num_samples),
@@ -216,10 +247,13 @@ NNTrainer::InputTensorsInfo::InputTensorsInfo(int64_t _num_samples,
   num_labels(_num_labels) {
 
   ml_logd("<called>");
-
-  tensor_data.reserve(_num_samples);
+  const int min_queue_size = 30;
+  queue_size = (_num_samples > min_queue_size) ? min_queue_size : _num_samples;
+  ml_logd("queue_size:%d", queue_size);
+  tensor_data.reserve(queue_size);
   pthread_mutex_init(&mutex, NULL);
-  pthread_cond_init(&cond, NULL);
+  pthread_cond_init(&data_wait_cond, NULL);
+  pthread_cond_init(&data_full_cond, NULL);
 
   int64_t idx = 0, i = 0;
   for (i = 0; i < num_inputs; i++) {
@@ -231,6 +265,25 @@ NNTrainer::InputTensorsInfo::InputTensorsInfo(int64_t _num_samples,
     ml_logd("label_size[%d]=%d", i, label_size[i]);
   }
 
+  unsigned int cur_queue_size = 0;
+
+  /* make queue */
+  while (cur_queue_size < queue_size) {
+    NNTrainer::TensorData t_data;
+    int i = 0;
+    char *p_data = nullptr;
+    for (i = 0; i < num_inputs; i++) {
+      p_data = new char[input_size[i]];
+      t_data.inputs.emplace_back(p_data);
+    }
+    for (i = 0; i < num_labels; i++) {
+      p_data = new char[label_size[i]];
+      t_data.labels.emplace_back(p_data);
+    }
+
+    tensor_data.emplace_back(t_data);
+    cur_queue_size++;
+  }
   ml_logd("<leave>");
 }
 
@@ -275,6 +328,9 @@ void NNTrainer::NNTrainerTrain::getNNStreamerProperties(
   num_valid_samples = prop->num_valid_samples;
   model_save_path = prop->model_save_path;
   train_complete_cond = prop->train_complete_cond;
+  num_epoch = 5; // for test
+  is_train_complete = FALSE;
+  total_num_samples = (num_train_samples + num_valid_samples) * num_epoch;
 
   ml_logd("num_inputs: %d", num_inputs);
   ml_logd("num_labels: %d", num_labels);
@@ -345,6 +401,7 @@ void NNTrainer::NNTrainerTrain::trainModel() {
   try {
     ml_logd("Save_model: %s", model_save_path.c_str());
     model->save(model_save_path, ml::train::ModelFormat::MODEL_FORMAT_BIN);
+    is_train_complete = TRUE;
     ml_logd("send train_complete_cond signal");
     g_cond_signal(train_complete_cond);
 
@@ -385,7 +442,7 @@ void NNTrainer::NNTrainerTrain::createModel() {
 
 NNTrainer::NNTrainerTrain::NNTrainerTrain(
   const GstTensorTrainerProperties *prop, const std::string &_model_config) :
-  model_config(_model_config) {
+  num_invoke(0), model_config(_model_config) {
   ml_logd("<called>");
   getNNStreamerProperties(prop);
   createDataset();
@@ -431,12 +488,15 @@ static int nntrainer_getFrameworkInfo(const GstTensorTrainerFramework *fw,
                                       void *private_data,
                                       GstTensorTrainerFrameworkInfo *fw_info) {
   static gchar subplugin_name[] = "nntrainer";
+  NNTrainer::NNTrainerTrain *nntrainer =
+    reinterpret_cast<NNTrainer::NNTrainerTrain *>(private_data);
   ml_logd("<called>");
   UNUSED(fw);
   UNUSED(prop);
-  UNUSED(private_data);
 
   fw_info->name = subplugin_name;
+  if (nntrainer)
+    fw_info->train_complete = nntrainer->is_train_complete;
   ml_logd("<leave>");
   return 0;
 }
index 77a28f6d8f89d7ccea295ee870eeb5a72fd62edd..58f1d3d78b17d69f8f4a33e553840f2c2cdf74e3 100644 (file)
@@ -76,7 +76,9 @@ public:
    * @brief Nntrainer dataset
    */
   std::shared_ptr<ml::train::Dataset> dataset_train, dataset_valid;
+
   float training_loss, validation_loss;
+  bool is_train_complete;
 
   int64_t tensors_inputsize[NNS_TENSOR_SIZE_LIMIT];
   int64_t num_tensors;
@@ -84,6 +86,9 @@ public:
   int64_t num_labels;
   int64_t num_train_samples;
   int64_t num_valid_samples;
+  int64_t total_num_samples;
+  int64_t num_epoch;
+  int64_t num_invoke;
   std::string model_config;
   std::string model_save_path;
 
@@ -113,7 +118,11 @@ public:
    */
   ~InputTensorsInfo();
 
-  bool is_mutex_locked;
+  bool is_data_wait;
+  bool is_data_full;
+  unsigned int queue_size;
+  unsigned int queue_front;
+  unsigned int queue_rear;
   int64_t push_count;
   int64_t pop_count;
   int64_t input_size[NNS_TENSOR_SIZE_LIMIT]; // feature size * data type
@@ -124,6 +133,7 @@ public:
 
   std::vector<TensorData> tensor_data;
   pthread_mutex_t mutex;
-  pthread_cond_t cond;
+  pthread_cond_t data_wait_cond;
+  pthread_cond_t data_full_cond;
 };
 } // namespace NNTrainer