2 Copyright (c) 2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 from collections import OrderedDict
20 from ..pipeline_connectors import create_connection_description
21 from ..utils import get_indexs, find_nearest
22 from ..adapters import create_adapter
23 from ..data_readers import create_reader
24 from ..dataset import Dataset
25 from ..launcher import create_launcher, InputFeeder
26 from ..metrics import MetricsExecutor
27 from ..pipeline_connectors import StageConnectionDescription, Connection
28 from ..postprocessor import PostprocessingExecutor
29 from..preprocessor import PreprocessingExecutor
32 def get_processing_info(pipeline_config):
33 name = pipeline_config['name']
34 stages = pipeline_config['stages']
35 dataset_name = stages[0]['dataset']['name']
38 if 'launcher' in stage:
39 launcher = stage['launcher']
41 framework = launcher.get('framework')
42 device = launcher.get('device')
43 tags = launcher.get('tags')
45 return name, framework, device, tags, dataset_name
48 def create_launcher_attribution(launchers_ids, launchers, datasets_ids, datasets, executors, executor_types):
49 launchers_ids = np.array(launchers_ids)
50 datasets_ids = np.array(datasets_ids)
51 for launcher_id_info, launcher in zip(enumerate(launchers_ids), launchers):
52 iteration, launcher_id = launcher_id_info
53 input_feeder = InputFeeder(
54 launcher.config.get('inputs', []), launcher.get_all_inputs(), launcher.fit_to_input
56 launchers_ids[iteration:] += 1
57 executors.insert(launcher_id, input_feeder)
58 executor_types.insert(launcher_id, 'input_feeder')
59 adapter_config = launcher.config.get('adapter')
60 dataset_id = find_nearest(datasets_ids, launcher_id, 'less')
61 datasets_ids[dataset_id + 1:] += 1
62 dataset = datasets[dataset_id] if dataset_id != -1 else None
65 adapter = create_adapter(adapter_config, launcher, dataset)
66 executors.insert(launcher_id + 1, adapter)
67 executor_types.insert(launcher_id + 1, 'adapter')
68 if dataset_id != datasets_ids.size - 1:
69 datasets_ids[dataset_id + 1:] += 1
70 if iteration != launchers_ids.size - 1:
71 launchers_ids[iteration + 1:] += 1
74 def set_metrics_dataset(metrics_ids, metrics_executors, datasets_ids, datasets):
75 for metrics_id, metric_executor in zip(metrics_ids, metrics_executors):
76 dataset_id = find_nearest(datasets_ids, metrics_id, 'less')
78 metric_executor.dataset = datasets[dataset_id].metadata
82 def __init__(self, evaluation_context, executors):
83 self._evaluation_context = evaluation_context
84 self.executors = executors
87 for executor in self.executors:
88 executor(self.evaluation_context)
91 def from_configs(cls, stage_name, stage_config):
94 'preprocessing': PreprocessingExecutor,
95 'launcher': create_launcher,
96 'postprocessing': PostprocessingExecutor,
97 'metrics': MetricsExecutor,
98 'reader': create_reader,
103 for key, config in stage_config.items():
104 if key in config_mapping:
105 connection = create_connection_description(config, stage_name)
107 executors.append(connection)
108 executor_types.append('connection')
109 executor_creator = config_mapping[key]
110 executor = executor_creator(config)
111 executor_types.append(key)
112 executors.append(executor)
114 dataset_ids = get_indexs(executor_types, 'dataset')
115 datasets = [executors[idx] for idx in dataset_ids]
116 launcher_ids = get_indexs(executor_types, 'launcher')
117 launchers = [executors[idx] for idx in launcher_ids]
118 create_launcher_attribution(launcher_ids, launchers, dataset_ids, datasets, executors, executor_types)
120 metrics_executors_id = get_indexs(executor_types, 'metrics')
121 dataset_ids = get_indexs(executor_types, 'dataset')
122 metrics_executors = [executors[idx] for idx in metrics_executors_id]
123 set_metrics_dataset(metrics_executors_id, metrics_executors, dataset_ids, datasets)
124 dataset = datasets[0] if datasets else None
125 eval_context = EvaluationContext(dataset, metrics_executors, launchers)
127 return cls(eval_context, executors)
130 def evaluation_context(self):
131 return self._evaluation_context
133 @evaluation_context.setter
134 def evaluation_context(self, new_context):
135 _shared_context = new_context.shared_context
136 for field, value in _shared_context.items():
138 setattr(self._evaluation_context, field, value)
141 class EvaluationContext:
142 def __init__(self, dataset, metric_executor=None, launcher=None):
143 self.annotations = []
144 self.predictions = []
145 self.annotation_batch = []
146 self.prediction_batch = []
148 self.metrics_results = []
149 self.identifiers_batch = []
150 self.metrics_executor = metric_executor
151 self.dataset_size = dataset.size if dataset else 0
152 self.launcher = launcher
153 self.dataset = dataset
156 def shared_context(self):
158 'annotations': self.annotations,
159 'predictions': self.predictions,
160 'annotation_batch': self.annotation_batch,
161 'prediction_batch': self.prediction_batch,
162 'data_batch': self.data_batch,
163 'identifiers_batch': self.identifiers_batch
165 return _shared_context
168 class PipeLineEvaluator:
169 def __init__(self, stages):
171 self.create_connectors()
172 self.context = next(iter(stages.values())).evaluation_context
175 def from_configs(cls, pipeline_config):
176 stages = OrderedDict()
177 for stage_config in pipeline_config:
178 stage_name = stage_config['stage']
179 evaluation_stage = PipeLineStage.from_configs(stage_name, stage_config)
180 stages[stage_name] = evaluation_stage
183 def create_connectors(self):
184 def make_connection(stages, connection_template):
185 return Connection(stages, connection_template)
187 def replace_connections(stage, all_stages):
188 for executor_id, executor in enumerate(stage.executors):
189 if isinstance(executor, StageConnectionDescription):
190 connector = make_connection(all_stages, executor)
191 stage.executors[executor_id] = connector
193 for _, stage in self.stages.items():
194 replace_connections(stage, self.stages)
196 def process_dataset(self, stored_predictions, progress_reporter, *args, **kwargs):
197 self.progress_reporter = progress_reporter
198 dataset_size = self.context.dataset_size
199 dataset_size = dataset_size if dataset_size else 0
200 self.progress_reporter.reset(dataset_size)
202 previous_context = self.context
203 while self.progress_reporter.progress != 100:
204 for _, stage in self.stages.items():
205 stage.evaluation_context = previous_context
207 previous_context = stage.evaluation_context
209 progress_reporter.update(iteration, len(previous_context.data_batch))
210 self.context = previous_context
212 if progress_reporter:
213 progress_reporter.finish()
215 def compute_metrics(self, output_callback=None, ignore_results_formatting=False):
216 def eval_metrics(metrics_executor, annotations, predictions):
217 for result_presenter, evaluated_metric in metrics_executor.iterate_metrics(annotations, predictions):
218 result_presenter.write_result(evaluated_metric, output_callback, ignore_results_formatting)
220 for _, stage in self.stages.items():
221 metrics_executors = stage.evaluation_context.metrics_executor
222 for metrics_executor in metrics_executors:
223 eval_context = stage.evaluation_context
224 eval_metrics(metrics_executor, eval_context.annotations, eval_context.predictions)
227 for _, stage in self.stages.items():
228 for launcher in stage.evaluation_context.launcher: