1 from collections import namedtuple
5 import openvino.inference_engine as ie
7 from ...utils.network_info import NetworkInfo
8 from ...network import Network
10 from ..layer_accuracy_drop_info import LayerAccuracyDropInfo
11 from ..logging import info, debug
12 from ..single_layer_network import SingleLayerNetwork
13 from ..inference_result import InferenceResult
15 QuantizationLayer = namedtuple('QuantizationLayer', 'index layer')
18 class SingleLayerNetworkThread(threading.Thread):
23 full_network_result: InferenceResult,
24 network: ie.IENetwork,
25 network_info: NetworkInfo,
26 quantization_layer: QuantizationLayer
29 threading.Thread.__init__(self)
30 self.base_calibrator = base_calibrator
31 self.statistics = statistics
32 self.full_network_result = full_network_result
33 self.network = network
34 self.network_info = network_info
35 self.quantization_layer = quantization_layer
39 self.result = self.base_calibrator.collect_in_thread(
41 self.full_network_result,
44 self.quantization_layer)
46 class CollectorByLayer:
48 def __init__(self, configuration, plugin, normalizer):
49 self._configuration = configuration
51 self._normalizer = normalizer
53 def collect(self, statistics: dict(), full_network_result: InferenceResult) -> list:
55 Method get layers which can be quantized and affect on final accuracy. Separate network is created for each layer.
57 accuracy_drop_by_layer = list()
59 network = ie.IENetwork(self._configuration.model, self._configuration.weights)
60 # if self._configuration.batch_size:
61 # # need to use reshape API
62 # network.batch_size = self._configuration.batch_size
65 network_info = NetworkInfo(self._configuration.model)
67 # 2. go over all layers which affect accuracy and create network basing on it
68 quantization_layers = list()
72 for layer in network.layers.values():
73 if self._normalizer.is_quantization_supported(layer.type):
74 layer_info = network_info.get_layer(layer.name)
75 if (len(layer_info.outputs) == 1) and (len(layer_info.outputs[0].layer.inputs) == 1):
76 quantization_layer = QuantizationLayer(index, layer)
77 quantization_layers.append(quantization_layer)
78 threads.append(SingleLayerNetworkThread(self, statistics, full_network_result, network, network_info, quantization_layer))
82 threads_num = multiprocessing.cpu_count() * 2
83 active_threads = list()
85 active_threads.clear()
86 for thread_num in range(threads_num):
87 active_thread = next(it, None)
90 active_threads.append(active_thread)
93 for active_thread in active_threads:
97 debug("all layer networks were infered")
100 debug("all layer networks before #{} were infered".format(active_thread.quantization_layer.index))
102 for thread in threads:
104 accuracy_drop_by_layer.append(thread.result)
106 accuracy_drop_by_layer.sort(key=lambda accuracy_drop: accuracy_drop.value, reverse=True)
107 return accuracy_drop_by_layer
111 def collect_in_thread(
114 full_network_result: InferenceResult,
115 network: ie.IENetwork,
116 network_info: NetworkInfo,
117 quantization_layer: QuantizationLayer
118 ) -> LayerAccuracyDropInfo:
120 index = quantization_layer.index
121 layer_to_clone = quantization_layer.layer
122 layer_to_clone_info = network_info.get_layer(layer_to_clone.name)
124 activation_layer = network.layers[layer_to_clone_info.outputs[0].layer.name] if (len(layer_to_clone_info.outputs) == 1 and self._normalizer.is_quantization_fusing_supported(layer_to_clone_info, layer_to_clone_info.outputs[0].layer)) else None
126 debug("create network #{} for layer {} ({}) -> {} ({})".format(index, layer_to_clone.name, layer_to_clone.type, activation_layer.name, activation_layer.type))
128 debug("create network #{} for layer {} ({})".format(index, layer_to_clone.name, layer_to_clone.type))
130 layer_network, reference_output_layer_name = self._normalizer.create_network_for_layer(
131 self._configuration.weights,
136 Network.reshape(layer_network, self._configuration.batch_size)
139 # TODO: initialize only neccessary statistic
140 for layer_name, node_statistic in statistics.items():
141 network_stats[layer_name] = ie.LayerStats(min=tuple(node_statistic.min_outputs), max=tuple(node_statistic.max_outputs))
142 layer_network.stats.update(network_stats)
144 params = layer_network.layers[layer_to_clone.name].params
145 params["quantization_level"] = 'I8' if self._configuration.precision == 'INT8' else self._configuration.precision
146 layer_network.layers[layer_to_clone.name].params = params
148 exec_network = self._plugin.load(network=layer_network, config={ "EXCLUSIVE_ASYNC_REQUESTS": "YES" })
150 if len(layer_network.inputs) != 1:
151 raise ValueError("created network has several inputs")
153 network_input_layer_name = next(iter(layer_network.inputs.keys()))
155 with SingleLayerNetwork(
156 network=layer_network,
157 exec_network=exec_network,
158 input_layer_name=network_input_layer_name,
159 layer_name=layer_to_clone.name,
160 output_layer_name=layer_to_clone.name + "_",
161 reference_output_layer_name=reference_output_layer_name
162 ) as single_layer_network:
164 debug("single layer #{} {} network infer".format(index, single_layer_network.layer_name))
165 accuracy_drop_list = self.infer_single_layer_network(single_layer_network, full_network_result)
167 return LayerAccuracyDropInfo(
168 layer_name=single_layer_network.layer_name,
169 value=LayerAccuracyDropInfo.calculate(accuracy_drop_list))
171 def infer_single_layer_network(self, single_layer_network: SingleLayerNetwork, full_network_results: list()):
173 Native infer and compare results
176 if full_network_results.result is None:
177 raise ValueError("output inference results are absent")
179 accuracy_drop_list = list()
180 for full_network_result in full_network_results.result:
181 difference = self._normalizer.infer_single_layer_network(single_layer_network, full_network_result)
182 accuracy_drop_list.append(difference)
184 return accuracy_drop_list