import os
import sys
+import logging
import gi
+
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
self.loop = None
self.pipeline = None
self.running = False
- self.received = 0
self.current_label_index = -1
self.new_label_index = -1
self.tflite_model = ''
:return: None
"""
if message.type == Gst.MessageType.EOS:
- print('received eos message')
+ logging.info('received eos message')
self.loop.quit()
elif message.type == Gst.MessageType.ERROR:
error, debug = message.parse_error()
- print('[error]', error, debug)
+ logging.warning('[error] %s : %s', error.message, debug)
self.loop.quit()
elif message.type == Gst.MessageType.WARNING:
error, debug = message.parse_warning()
- print('[warning]', error, debug)
+ logging.warning('[warning] %s : %s', error.message, debug)
elif message.type == Gst.MessageType.STREAM_START:
- print('received start message')
+ logging.info('received start message')
elif message.type == Gst.MessageType.QOS:
data_format, processed, dropped = message.parse_qos_stats()
- print('[qos]', data_format, processed, dropped)
+ format_str = Gst.Format.get_name(data_format)
+ logging.debug('[qos] format[%s] processed[%d] dropped[%d]', format_str, processed, dropped)
def on_new_data(self, sink, buffer):
"""Callback for tensor sink signal.
:param buffer: buffer from element
:return: None
"""
- # print progress
- self.received += 1
- print('receiving new data:', self.received)
-
if self.running:
for idx in range(buffer.n_memory()):
mem = buffer.peek_memory(idx)
# check model file exists
self.tflite_model = os.path.join(model_folder, tflite_model)
if not os.path.exists(self.tflite_model):
- print('cannot find tflite model:', self.tflite_model)
+ logging.error('cannot find tflite model [%s]', self.tflite_model)
return False
# load labels
for line in label_file.readlines():
self.tflite_labels.append(line)
except FileNotFoundError:
- print('cannot find tflite label:', label_path)
+ logging.error('cannot find tflite label [%s]', label_path)
return False
- print('finished to load labels, total:', len(self.tflite_labels))
+ logging.info('finished to load labels, total [%d]', len(self.tflite_labels))
return True
def tflite_get_label(self, index):
if max_score > 0:
self.new_label_index = scores.index(max_score)
else:
- print('unexpected data size:', data_size)
+ logging.error('unexpected data size [%d]', data_size)
if __name__ == '__main__':