nntrainer->trainModel();
}
+/**
+ * @brief invoke function
+ * tensor_trainer call this function to send tensor_data.
+ * For epoch, (number of trin samples + number of valid samples) * epoch
+ * data should be received.
+ * Sub-plugin don't keep dataset for epoch.
+ */
static int nntrainer_model_invoke(const GstTensorTrainerFramework *fw,
const GstTensorTrainerProperties *prop,
void *private_data,
NNTrainer::NNTrainerTrain *nntrainer =
reinterpret_cast<NNTrainer::NNTrainerTrain *>(private_data);
UNUSED(fw);
+ UNUSED(prop);
+ pid_t pid = getpid();
+ pid_t tid = syscall(SYS_gettid);
+
ml_logd("<called>");
+ ml_logd("pid[%d], tid[%d]", pid, tid);
if (!nntrainer) {
ml_loge("Failed get nntrainer");
return -1;
}
- UNUSED(prop);
+
+ nntrainer->num_invoke++;
+ ml_logd("Received data (%d/%d)", nntrainer->num_invoke,
+ nntrainer->total_num_samples);
+ if (nntrainer->total_num_samples < nntrainer->num_invoke) {
+ ml_logd("Already received all data required for the train, "
+ "but invoke is called");
+ return 0;
+ }
if (nntrainer->train_data->push_count == nntrainer->num_train_samples &&
nntrainer->valid_data->push_count == nntrainer->num_valid_samples) {
- ml_logd("data is full");
- return 0;
+ nntrainer->train_data->push_count = nntrainer->valid_data->push_count = 0;
}
if (nntrainer->train_data->push_count < nntrainer->num_train_samples) {
nntrainer->num_labels);
NNTrainer::TensorData tensor_data;
- int64_t idx = 0, i = 0;
- char *p_data = nullptr;
- for (i = 0; i < data->num_inputs; i++) {
- ml_logd("input[%d]:%p, size:%zd\n", i, input[i].data, input[i].size);
- p_data = new char[input[idx].size];
- std::memcpy(p_data, input[idx].data, input[idx].size);
- tensor_data.inputs.emplace_back(p_data);
- ml_logd("input[%d].data = p %p\n", idx, (input[idx].data));
- ml_logd("tensor_data.inputs[%d] = %p\n", idx, tensor_data.inputs[idx]);
+ unsigned int idx = 0, i = 0;
+
+ i = data->queue_rear;
+ ml_logd("Insert, queue_rear : %d", i);
+ for (auto inputs : data->tensor_data[i].inputs) {
+ ml_logd("inputs : %p", inputs);
+ ml_logd("input[%d]:%p, size:%zd\n", idx, input[idx].data, input[idx].size);
+ std::memcpy(inputs, input[idx].data, input[idx].size);
idx++;
}
- for (i = 0; i < data->num_labels; i++) {
- p_data = new char[input[idx].size];
- std::memcpy(p_data, input[idx].data, input[idx].size);
- tensor_data.labels.emplace_back(p_data);
+
+ for (auto labels : data->tensor_data[i].labels) {
+ ml_logd("labels : %p", labels);
+ ml_logd("input[%d]:%p, size:%zd\n", idx, input[idx].data, input[idx].size);
+ std::memcpy(labels, input[idx].data, input[idx].size);
idx++;
}
- data->tensor_data.emplace_back(tensor_data);
+ data->queue_rear++;
data->push_count++;
+ ml_logd("front:%d, rear:%d", data->queue_front, data->queue_rear);
- ml_logd("(pop/push: %d/%d)", data->pop_count, data->push_count);
-
-#if 0
- for (auto data : data->tensor_data) {
- for (auto inputs : data.inputs) {
- ml_logd("##I addr:%p", inputs);
- }
- for (auto labels : data.labels) {
- ml_logd("##L addr:%p", labels);
- }
+ if (data->is_data_wait && data->queue_rear > data->queue_front) {
+ pthread_mutex_lock(&data->mutex);
+ ml_logd("send signal");
+ pthread_cond_signal(&data->data_wait_cond);
+ pthread_mutex_unlock(&data->mutex);
}
-#endif
- if (data->is_mutex_locked && data->push_count > data->pop_count) {
+ if (data->queue_rear == data->queue_size) {
pthread_mutex_lock(&data->mutex);
- ml_logd("send signal");
- pthread_cond_signal(&data->cond);
+ data->is_data_full = TRUE;
+ ml_logd("locked, data is full");
+ pthread_cond_wait(&data->data_full_cond, &data->mutex);
+ ml_logd("unlocked, queue is empty");
pthread_mutex_unlock(&data->mutex);
+ data->is_data_full = FALSE;
+ data->queue_rear = data->queue_front = 0;
}
+ ml_logd("(pop/push: %d/%d)", data->pop_count, data->push_count);
ml_logd("T-pushed:%d/%d, V-pushed:%d/%d\n", nntrainer->train_data->push_count,
nntrainer->num_train_samples, nntrainer->valid_data->push_count,
nntrainer->num_valid_samples);
ml_logd("<called>");
ml_logd("pid[%d], tid[%d]", pid, tid);
- if (data->push_count <= data->pop_count) {
+ ml_logd("front:%d, rear:%d", data->queue_front, data->queue_rear);
+ if (data->queue_rear <= data->queue_front) {
pthread_mutex_lock(&data->mutex);
- data->is_mutex_locked = TRUE;
+ data->is_data_wait = TRUE;
ml_logd("locked, need to wait for more data");
- pthread_cond_wait(&data->cond, &data->mutex);
+ pthread_cond_wait(&data->data_wait_cond, &data->mutex);
ml_logd("unlocked, get data");
pthread_mutex_unlock(&data->mutex);
- data->is_mutex_locked = FALSE;
+ data->is_data_wait = FALSE;
}
ml_logd("num_inputs: %d, num_labels: %d", data->num_inputs, data->num_labels);
- int64_t i = 0;
- int idx = data->pop_count;
- ml_logd("pop idx: %d", idx);
+ unsigned int i = 0;
+ unsigned int idx = data->queue_front;
+ ml_logd("Delete, queue_front: %d", idx);
for (i = 0; i < data->num_inputs; i++) {
ml_logd("memcpy Addr %p, %p, size=%d\n", *(input + i),
}
data->pop_count++;
+ data->queue_front++;
ml_logd("(pop/push: %d/%d)", data->pop_count, data->push_count);
- if (data->pop_count < data->num_samples) {
+ if (data->pop_count < data->num_samples) { // train or valid num samples
*last = false;
} else {
*last = true;
std::shuffle(data->tensor_data.begin(), data->tensor_data.end(), g);
}
+ if (data->is_data_full && data->queue_rear == data->queue_front) {
+ pthread_mutex_lock(&data->mutex);
+ ml_logd("send signal");
+ pthread_cond_signal(&data->data_full_cond);
+ pthread_mutex_unlock(&data->mutex);
+ }
+
ml_logd("<leave>");
return 0;
int64_t _num_inputs,
int64_t _num_labels,
int64_t _tensors_inputsize[]) :
- is_mutex_locked(0),
+ is_data_wait(0),
+ is_data_full(0),
+ queue_front(0),
+ queue_rear(0),
push_count(0),
pop_count(0),
num_samples(_num_samples),
num_labels(_num_labels) {
ml_logd("<called>");
-
- tensor_data.reserve(_num_samples);
+ const int min_queue_size = 30;
+ queue_size = (_num_samples > min_queue_size) ? min_queue_size : _num_samples;
+ ml_logd("queue_size:%d", queue_size);
+ tensor_data.reserve(queue_size);
pthread_mutex_init(&mutex, NULL);
- pthread_cond_init(&cond, NULL);
+ pthread_cond_init(&data_wait_cond, NULL);
+ pthread_cond_init(&data_full_cond, NULL);
int64_t idx = 0, i = 0;
for (i = 0; i < num_inputs; i++) {
ml_logd("label_size[%d]=%d", i, label_size[i]);
}
+ unsigned int cur_queue_size = 0;
+
+ /* make queue */
+ while (cur_queue_size < queue_size) {
+ NNTrainer::TensorData t_data;
+ int i = 0;
+ char *p_data = nullptr;
+ for (i = 0; i < num_inputs; i++) {
+ p_data = new char[input_size[i]];
+ t_data.inputs.emplace_back(p_data);
+ }
+ for (i = 0; i < num_labels; i++) {
+ p_data = new char[label_size[i]];
+ t_data.labels.emplace_back(p_data);
+ }
+
+ tensor_data.emplace_back(t_data);
+ cur_queue_size++;
+ }
ml_logd("<leave>");
}
num_valid_samples = prop->num_valid_samples;
model_save_path = prop->model_save_path;
train_complete_cond = prop->train_complete_cond;
+ num_epoch = 5; // for test
+ is_train_complete = FALSE;
+ total_num_samples = (num_train_samples + num_valid_samples) * num_epoch;
ml_logd("num_inputs: %d", num_inputs);
ml_logd("num_labels: %d", num_labels);
try {
ml_logd("Save_model: %s", model_save_path.c_str());
model->save(model_save_path, ml::train::ModelFormat::MODEL_FORMAT_BIN);
+ is_train_complete = TRUE;
ml_logd("send train_complete_cond signal");
g_cond_signal(train_complete_cond);
NNTrainer::NNTrainerTrain::NNTrainerTrain(
const GstTensorTrainerProperties *prop, const std::string &_model_config) :
- model_config(_model_config) {
+ num_invoke(0), model_config(_model_config) {
ml_logd("<called>");
getNNStreamerProperties(prop);
createDataset();
void *private_data,
GstTensorTrainerFrameworkInfo *fw_info) {
static gchar subplugin_name[] = "nntrainer";
+ NNTrainer::NNTrainerTrain *nntrainer =
+ reinterpret_cast<NNTrainer::NNTrainerTrain *>(private_data);
ml_logd("<called>");
UNUSED(fw);
UNUSED(prop);
- UNUSED(private_data);
fw_info->name = subplugin_name;
+ if (nntrainer)
+ fw_info->train_complete = nntrainer->is_train_complete;
ml_logd("<leave>");
return 0;
}