Publishing 2019 R2 content (#223)
[platform/upstream/dldt.git] / tools / accuracy_checker / accuracy_checker / evaluators / pipeline_evaluator.py
1 """
2 Copyright (c) 2019 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 """
16
17 from collections import OrderedDict
18 import numpy as np
19
20 from ..pipeline_connectors import create_connection_description
21 from ..utils import get_indexs, find_nearest
22 from ..adapters import create_adapter
23 from ..data_readers import create_reader
24 from ..dataset import Dataset
25 from ..launcher import create_launcher, InputFeeder
26 from ..metrics import MetricsExecutor
27 from ..pipeline_connectors import StageConnectionDescription, Connection
28 from ..postprocessor import PostprocessingExecutor
29 from..preprocessor import PreprocessingExecutor
30
31
32 def get_processing_info(pipeline_config):
33     name = pipeline_config['name']
34     stages = pipeline_config['stages']
35     dataset_name = stages[0]['dataset']['name']
36     launcher = {}
37     for stage in stages:
38         if 'launcher' in stage:
39             launcher = stage['launcher']
40             break
41     framework = launcher.get('framework')
42     device = launcher.get('device')
43     tags = launcher.get('tags')
44
45     return name, framework, device, tags, dataset_name
46
47
48 def create_launcher_attribution(launchers_ids, launchers, datasets_ids, datasets, executors, executor_types):
49     launchers_ids = np.array(launchers_ids)
50     datasets_ids = np.array(datasets_ids)
51     for launcher_id_info, launcher in zip(enumerate(launchers_ids), launchers):
52         iteration, launcher_id = launcher_id_info
53         input_feeder = InputFeeder(
54             launcher.config.get('inputs', []), launcher.get_all_inputs(), launcher.fit_to_input
55         )
56         launchers_ids[iteration:] += 1
57         executors.insert(launcher_id, input_feeder)
58         executor_types.insert(launcher_id, 'input_feeder')
59         adapter_config = launcher.config.get('adapter')
60         dataset_id = find_nearest(datasets_ids, launcher_id, 'less')
61         datasets_ids[dataset_id + 1:] += 1
62         dataset = datasets[dataset_id] if dataset_id != -1 else None
63         launcher_id += 1
64         if adapter_config:
65             adapter = create_adapter(adapter_config, launcher, dataset)
66             executors.insert(launcher_id + 1, adapter)
67             executor_types.insert(launcher_id + 1, 'adapter')
68             if dataset_id != datasets_ids.size - 1:
69                 datasets_ids[dataset_id + 1:] += 1
70             if iteration != launchers_ids.size - 1:
71                 launchers_ids[iteration + 1:] += 1
72
73
74 def set_metrics_dataset(metrics_ids, metrics_executors, datasets_ids, datasets):
75     for metrics_id, metric_executor in zip(metrics_ids, metrics_executors):
76         dataset_id = find_nearest(datasets_ids, metrics_id, 'less')
77         if dataset_id != -1:
78             metric_executor.dataset = datasets[dataset_id].metadata
79
80
81 class PipeLineStage:
82     def __init__(self, evaluation_context, executors):
83         self._evaluation_context = evaluation_context
84         self.executors = executors
85
86     def run(self):
87         for executor in self.executors:
88             executor(self.evaluation_context)
89
90     @classmethod
91     def from_configs(cls, stage_name, stage_config):
92         config_mapping = {
93             'dataset': Dataset,
94             'preprocessing': PreprocessingExecutor,
95             'launcher': create_launcher,
96             'postprocessing': PostprocessingExecutor,
97             'metrics': MetricsExecutor,
98             'reader': create_reader,
99         }
100
101         executor_types = []
102         executors = []
103         for key, config in stage_config.items():
104             if key in config_mapping:
105                 connection = create_connection_description(config, stage_name)
106                 if connection:
107                     executors.append(connection)
108                     executor_types.append('connection')
109                 executor_creator = config_mapping[key]
110                 executor = executor_creator(config)
111                 executor_types.append(key)
112                 executors.append(executor)
113
114         dataset_ids = get_indexs(executor_types, 'dataset')
115         datasets = [executors[idx] for idx in dataset_ids]
116         launcher_ids = get_indexs(executor_types, 'launcher')
117         launchers = [executors[idx] for idx in launcher_ids]
118         create_launcher_attribution(launcher_ids, launchers, dataset_ids, datasets, executors, executor_types)
119
120         metrics_executors_id = get_indexs(executor_types, 'metrics')
121         dataset_ids = get_indexs(executor_types, 'dataset')
122         metrics_executors = [executors[idx] for idx in metrics_executors_id]
123         set_metrics_dataset(metrics_executors_id, metrics_executors, dataset_ids, datasets)
124         dataset = datasets[0] if datasets else None
125         eval_context = EvaluationContext(dataset, metrics_executors, launchers)
126
127         return cls(eval_context, executors)
128
129     @property
130     def evaluation_context(self):
131         return self._evaluation_context
132
133     @evaluation_context.setter
134     def evaluation_context(self, new_context):
135         _shared_context = new_context.shared_context
136         for field, value in _shared_context.items():
137             if value:
138                 setattr(self._evaluation_context, field, value)
139
140
141 class EvaluationContext:
142     def __init__(self, dataset, metric_executor=None, launcher=None):
143         self.annotations = []
144         self.predictions = []
145         self.annotation_batch = []
146         self.prediction_batch = []
147         self.data_batch = []
148         self.metrics_results = []
149         self.identifiers_batch = []
150         self.metrics_executor = metric_executor
151         self.dataset_size = dataset.size if dataset else 0
152         self.launcher = launcher
153         self.dataset = dataset
154
155     @property
156     def shared_context(self):
157         _shared_context = {
158             'annotations': self.annotations,
159             'predictions': self.predictions,
160             'annotation_batch': self.annotation_batch,
161             'prediction_batch': self.prediction_batch,
162             'data_batch': self.data_batch,
163             'identifiers_batch': self.identifiers_batch
164         }
165         return _shared_context
166
167
168 class PipeLineEvaluator:
169     def __init__(self, stages):
170         self.stages = stages
171         self.create_connectors()
172         self.context = next(iter(stages.values())).evaluation_context
173
174     @classmethod
175     def from_configs(cls, pipeline_config):
176         stages = OrderedDict()
177         for stage_config in pipeline_config:
178             stage_name = stage_config['stage']
179             evaluation_stage = PipeLineStage.from_configs(stage_name, stage_config)
180             stages[stage_name] = evaluation_stage
181         return cls(stages)
182
183     def create_connectors(self):
184         def make_connection(stages, connection_template):
185             return Connection(stages, connection_template)
186
187         def replace_connections(stage, all_stages):
188             for executor_id, executor in enumerate(stage.executors):
189                 if isinstance(executor, StageConnectionDescription):
190                     connector = make_connection(all_stages, executor)
191                     stage.executors[executor_id] = connector
192
193         for _, stage in self.stages.items():
194             replace_connections(stage, self.stages)
195
196     def process_dataset(self, stored_predictions, progress_reporter, *args, **kwargs):
197         self.progress_reporter = progress_reporter
198         dataset_size = self.context.dataset_size
199         dataset_size = dataset_size if dataset_size else 0
200         self.progress_reporter.reset(dataset_size)
201         iteration = 0
202         previous_context = self.context
203         while self.progress_reporter.progress != 100:
204             for _, stage in self.stages.items():
205                 stage.evaluation_context = previous_context
206                 stage.run()
207                 previous_context = stage.evaluation_context
208             iteration += 1
209             progress_reporter.update(iteration, len(previous_context.data_batch))
210         self.context = previous_context
211
212         if progress_reporter:
213             progress_reporter.finish()
214
215     def compute_metrics(self, output_callback=None, ignore_results_formatting=False):
216         def eval_metrics(metrics_executor, annotations, predictions):
217             for result_presenter, evaluated_metric in metrics_executor.iterate_metrics(annotations, predictions):
218                 result_presenter.write_result(evaluated_metric, output_callback, ignore_results_formatting)
219
220         for _, stage in self.stages.items():
221             metrics_executors = stage.evaluation_context.metrics_executor
222             for metrics_executor in metrics_executors:
223                 eval_context = stage.evaluation_context
224                 eval_metrics(metrics_executor, eval_context.annotations, eval_context.predictions)
225
226     def release(self):
227         for _, stage in self.stages.items():
228             for launcher in stage.evaluation_context.launcher:
229                 launcher.release()