--- /dev/null
+TEST_DATA = \
+ logs/trace.latency.log
+
+all:
+
+logs/trace.latency.log:
+ mkdir -p logs; \
+ GST_DEBUG="GST_TRACER:7" GST_TRACERS=latency GST_DEBUG_FILE=$@ \
+ gst-launch-1.0 -q audiotestsrc num-buffers=10 wave=silence ! audioconvert ! autoaudiosink
+
+check: $(TEST_DATA)
+ python3 -m unittest discover tracer
--- /dev/null
+# Add a python api for tracer analyzers
+
+The python framework will parse the tracer log and aggregate information.
+the tool writer will subclass from the Analyzer class and override method like:
+
+ 'pad_push_buffer_pre'
+
+There is one method for each hook. Each of those methods will receive the parse
+log line. In addition the framework will offer some extra api to allow to e.g.
+write:
+
+ pad.name() # pad name
+ pad.parent().name() # element name
+ pad.peer().parent() # peer element
+ pad.parent().state() # element state
+
+If we don't have full loging, we'd like to print a warning once, but make this
+non fatal if possible. E.g. if we don't have logging for
+element_{add,remove}_pad, we might not be able to provide pad.parent().
+
+A tool can replay the log multiple times. If it does, it won't work in
+'streaming' mode though. Streaming mode can offer live stats.
+
+## TODO
+Do we want to provide classes like GstBin, GstElement, GstPad, ... to aggregate
+info. We'd need to also provide a way to e.g. add a GstLogAnalyzer that knows
+about data from the log tracer and populates the classes. We need to be able to
+build a pipeline of analyzers, e.g. the analyzer calls GstLogAnalzer in its
+catch-all handler and then processes some events individually.
+
+Parse the tracer classes. Add helper that for numeric values extract them, and
+aggregate min/max/avg. Consider other statistical information (std. deviation)
+and provide a rolling average for live view.
+
+Think of how analyzer classes can be combined:
+- we'd like to build tools that import other analyzer classes and chain the
+ processing.
+
+## Examples
+### Sequence chart generator (mscgen)
+
+1.) Write file header
+
+2.) collect element order
+Replay the log and use pad_link_pre to collect pad->peer_pad relationship.
+Build a sequence of element names and write to msc file.
+
+3.) collect event processing
+Replay the log and use pad_push_event_pre to output message lines to mscfile.
+
+4.) write footer and run the tool.
+
+## Latency stats
+
+1.) collect per sink-latencies and for each sink per source latencies
+Calculate min, max, avg. Consider streaming interface, where we update the stats
+e.g. once a sec
+
+2.) in non-streaming mode write final statistic
+
+## cpu load stats
+
+Like latency stats, for cpu load. Process cpu load + per thread cpu load.
+
+## top
+
+Combine various stats tools into one.
+
+# Improve tracers
+## log
+* the log tracer logs args and results into misc categories
+* issues
+ * not easy/reliable to detect its output among other trace output
+ * not easy to match pre/post lines
+ * uses own do_log method, instead of gst_tracer_record_log
+ * if we also log structures, we need to log the 'function' as the
+ structure-name, also fields would be key=(type)val, instead of key=value
+ * if we switch to gst_tracer_record_log, we'd need to register 27 formats :/
--- /dev/null
+import re
+import sys
+
+from tracer.parser import Parser
+from tracer.structure import Structure
+
+_SCOPE_RELATED_TO = {
+ 'GST_TRACER_VALUE_SCOPE_PAD': 'Pad',
+ 'GST_TRACER_VALUE_SCOPE_ELEMENT': 'Element',
+ 'GST_TRACER_VALUE_SCOPE_THREAD': 'Thread',
+ 'GST_TRACER_VALUE_SCOPE_PROCESS': 'Process',
+}
+
+_NUMERIC_TYPES = ('int', 'uint', 'gint', 'guint', 'gint64', 'guint64')
+
+class Analyzer(object):
+ '''Base class for a gst tracer analyzer.'''
+
+ def __init__(self, log):
+ self.log = log
+ self.records = {}
+ self.data = {}
+
+ def handle_tracer_class(self, event):
+ s = Structure(event[Parser.F_MESSAGE])
+ # TODO only for debugging
+ #print("tracer class:", repr(s))
+ name = s.name[:-len('.class')]
+ record = {
+ 'class': s,
+ 'scope' : {},
+ 'value' : {},
+ }
+ self.records[name] = record
+ for k,v in s.values.items():
+ if v.name == 'scope':
+ # TODO only for debugging
+ #print("scope: [%s]=%s" % (k, v))
+ record['scope'][k] = v
+ elif v.name == 'value':
+ # skip non numeric and those without min/max
+ if (v.values['type'] in _NUMERIC_TYPES and
+ 'min' in v.values and 'max' in v.values):
+ # TODO only for debugging
+ #print("value: [%s]=%s" % (k, v))
+ record['value'][k] = v
+ #else:
+ # TODO only for debugging
+ #print("skipping value: [%s]=%s" % (k, v))
+
+
+ def handle_tracer_entry(self, event):
+ # use first field in message (structure-id) if none
+ if event[Parser.F_FUNCTION]:
+ # TODO: parse params in event[Parser.F_MESSAGE]
+ vmethod_name = event[Parser.F_FUNCTION]
+ else:
+ s = Structure(event[Parser.F_MESSAGE])
+ vmethod_name = s.name
+ record = self.records.get(vmethod_name)
+ if record:
+ # aggregate event based on class
+ for sk,sv in record['scope'].items():
+ # look up bin by scope (or create new)
+ key = (_SCOPE_RELATED_TO[sv.values['related-to']] +
+ ":" + str(s.values[sk]))
+ scope = self.data.get(key)
+ if not scope:
+ scope = {}
+ self.data[key] = scope
+ for vk,vv in record['value'].items():
+ key = vmethod_name + "/" + vk
+ data = scope.get(key)
+ if not data:
+ data = {
+ 'num': 0,
+ 'sum': 0,
+ }
+ if 'max' in vv.values and 'min' in vv.values:
+ data['min'] = int(vv.values['max'])
+ data['max'] = int(vv.values['min'])
+ scope[key] = data
+ # update min/max/sum and count via value
+ dv = int(s.values[vk])
+ data['num'] += 1
+ data['sum'] += dv
+ if 'min' in data:
+ data['min'] = min(dv, data['min'])
+ if 'max' in data:
+ data['max'] = max(dv, data['max'])
+
+ # TODO: check if self has a catch-all handler and call first (check this in init)
+ # - we can use this to chain filters, allthough the chained filter
+ # would be doing the same as below
+ # check if self['vmethod'] is a function, if so call
+ vmethod = getattr (self, vmethod_name, None)
+ if callable(vmethod):
+ vmethod (event)
+
+ def is_tracer_class(self, event):
+ return (event[Parser.F_FILENAME] == 'gsttracerrecord.c' and
+ event[Parser.F_CATEGORY] == 'GST_TRACER' and
+ '.class' in event[Parser.F_MESSAGE])
+
+ def is_tracer_entry(self, event):
+ return (not event[Parser.F_LINE] and not event[Parser.F_FILENAME])
+
+ def run(self):
+ for event in self.log:
+ # check if it is a tracer.class or tracer event
+ if self.is_tracer_class(event):
+ self.handle_tracer_class(event)
+ elif self.is_tracer_entry(event):
+ self.handle_tracer_entry(event)
+ #else:
+ # print("unhandled:", repr(event))
--- /dev/null
+import unittest
+
+from tracer.analyzer import Analyzer
+
+TRACER_CLASS = ('0:00:00.036373170', 1788, '0x23bca70', 'TRACE', 'GST_TRACER', 'gsttracerrecord.c', 110, 'gst_tracer_record_build_format', None, r'latency.class, src=(structure)"scope\\,\\ type\\=\\(type\\)gchararray\\,\\ related-to\\=\\(GstTracerValueScope\\)GST_TRACER_VALUE_SCOPE_PAD\\;", sink=(structure)"scope\\,\\ type\\=\\(type\\)gchararray\\,\\ related-to\\=\\(GstTracerValueScope\\)GST_TRACER_VALUE_SCOPE_PAD\\;", time=(structure)"value\\,\\ type\\=\\(type\\)guint64\\,\\ description\\=\\(string\\)\\"time\\\\\\ it\\\\\\ took\\\\\\ for\\\\\\ the\\\\\\ buffer\\\\\\ to\\\\\\ go\\\\\\ from\\\\\\ src\\\\\\ to\\\\\\ sink\\\\\\ ns\\"\\,\\ flags\\=\\(GstTracerValueFlags\\)GST_TRACER_VALUE_FLAGS_AGGREGATED\\,\\ min\\=\\(guint64\\)0\\,\\ max\\=\\(guint64\\)18446744073709551615\\;";')
+
+TRACER_ENTRY = ('0:00:00.142391137', 1788, '0x7f8a201056d0', 'TRACE', 'GST_TRACER', '', 0, '', None, r'latency, src=(string)source_src, sink=(string)pulsesink0_sink, time=(guint64)47091349;')
+
+class TestAnalyzer(unittest.TestCase):
+
+ def test_detect_tracer_class(self):
+ a = Analyzer(None)
+ self.assertTrue(a.is_tracer_class(TRACER_CLASS))
+
+ def test_detect_tracer_entry(self):
+ a = Analyzer(None)
+ self.assertTrue(a.is_tracer_entry(TRACER_ENTRY))
+
--- /dev/null
+import os
+import re
+import sys
+
+# new tracer class
+# 0:00:00.041536066 1788 0x14b2150 TRACE GST_TRACER gsttracerrecord.c:110:gst_tracer_record_build_format: latency.class, src=(structure)"scope\,\ type\=\(GType\)NULL\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", sink=(structure)"scope\,\ type\=\(GType\)NULL\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", time=(structure)"value\,\ type\=\(GType\)NULL\,\ description\=\(string\)\"time\\\ it\\\ took\\\ for\\\ the\\\ buffer\\\ to\\\ go\\\ from\\\ src\\\ to\\\ sink\\\ ns\"\,\ flags\=\(GstTracerValueFlags\)GST_TRACER_VALUE_FLAGS_AGGREGATED\,\ min\=\(guint64\)0\,\ max\=\(guint64\)18446744073709551615\;";
+
+# tracer log entry
+# 0:00:00.079422574 7664 0x238ac70 TRACE GST_TRACER :0:: thread-rusage, thread-id=(guint64)37268592, ts=(guint64)79416000, average-cpuload=(uint)1000, current-cpuload=(uint)1000, time=(guint64)79418045;
+
+# from log tracer
+# 0:00:00.460486001 18356 0x21de780 TRACE GST_ELEMENT_PADS :0:do_element_add_pad:<GstBaseSink@0x429e880> 0:00:00.460483603, key=val, ...
+def _log_line_regex():
+
+ # "0:00:00.777913000 "
+ TIME = r"(\d+:\d\d:\d\d\.\d+)\s+"
+ # "DEBUG "
+ LEVEL = "([A-Z]+)\s+"
+ # "0x8165430 "
+ THREAD = r"(0x[0-9a-f]+)\s+"
+ # "GST_REFCOUNTING ", "flacdec "
+ CATEGORY = "([A-Za-z0-9_-]+)\s+"
+ # " 3089 "
+ PID = r"(\d+)\s*"
+ FILENAME = r"([^:]*):"
+ LINE = r"(\d+):"
+ FUNCTION = r"([A-Za-z0-9_]*):"
+ # FIXME: When non-g(st)object stuff is logged with *_OBJECT (like
+ # buffers!), the address is printed *without* <> brackets!
+ OBJECT = "(?:<([^>]+)>)?"
+ MESSAGE = "(.+)"
+
+ ANSI = "(?:\x1b\\[[0-9;]*m\\s*)*\\s*"
+
+ return [TIME, ANSI, PID, ANSI, THREAD, ANSI, LEVEL, ANSI, CATEGORY,
+ FILENAME, LINE, FUNCTION, ANSI, OBJECT, ANSI, MESSAGE]
+
+
+class Parser(object):
+ '''Helper to parse a tracer log'''
+
+ # record fields
+ F_TIME = 0
+ F_PID = 1
+ F_THREAD = 2
+ F_LEVEL = 3
+ F_CATEGORY = 4
+ F_FILENAME = 5
+ F_LINE = 6
+ F_FUNCTION = 7
+ F_OBJECT = 8
+ F_MESSAGE = 9
+
+ def __init__(self, filename):
+ self.filename = filename
+ self.log_regex = re.compile(''.join(_log_line_regex()))
+ self.file = None
+
+ def __enter__(self):
+ def __is_tracer(line):
+ return 'TRACE' in line
+
+ if self.filename != '-':
+ self.file = open(self.filename, 'rt')
+ else:
+ self.file = sys.stdin
+ self.data = filter(__is_tracer, self.file)
+ return self
+
+ def __exit__(self, *args):
+ if self.filename != '-':
+ self.file.close()
+ self.file = None
+
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ while True:
+ line = next(self.data)
+ match = self.log_regex.match(line)
+ if match:
+ g = list(match.groups())
+ g[Parser.F_PID] = int(g[Parser.F_PID])
+ g[Parser.F_LINE] = int(g[Parser.F_LINE])
+ return g
--- /dev/null
+import sys
+import unittest
+
+from tracer.parser import Parser
+
+TESTFILE = './logs/trace.latency.log'
+
+TEXT_DATA = ['first line', 'second line']
+
+TRACER_LOG_DATA = [
+ '0:00:00.079422574 7664 0x238ac70 TRACE GST_TRACER :0:: thread-rusage, thread-id=(guint64)37268592, ts=(guint64)79416000, average-cpuload=(uint)1000, current-cpuload=(uint)1000, time=(guint64)79418045;'
+]
+TRACER_CLASS_LOG_DATA = [
+ '0:00:00.041536066 1788 0x14b2150 TRACE GST_TRACER gsttracerrecord.c:110:gst_tracer_record_build_format: latency.class, src=(structure)"scope\,\ type\=\(type\)gchararray\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", sink=(structure)"scope\,\ type\=\(type\)gchararray\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", time=(structure)"value\,\ type\=\(type\)guint64\,\ description\=\(string\)\"time\\\ it\\\ took\\\ for\\\ the\\\ buffer\\\ to\\\ go\\\ from\\\ src\\\ to\\\ sink\\\ ns\"\,\ flags\=\(GstTracerValueFlags\)GST_TRACER_VALUE_FLAGS_AGGREGATED\,\ min\=\(guint64\)0\,\ max\=\(guint64\)18446744073709551615\;";'
+]
+
+class TestParser(unittest.TestCase):
+
+ def test___init__(self):
+ log = Parser(TESTFILE)
+ self.assertIsNone(log.file)
+
+ def test___enter___with_file(self):
+ with Parser(TESTFILE) as log:
+ self.assertIsNotNone(log.file)
+
+ def test___enter___with_stdin(self):
+ sys.stdin = TEXT_DATA
+ with Parser('-') as log:
+ self.assertIsNotNone(log.file)
+
+ def test_random_text_reports_none(self):
+ sys.stdin = TEXT_DATA
+ with Parser('-') as log:
+ with self.assertRaises(StopIteration):
+ next(log)
+
+ def test_log_file_reports_trace_log(self):
+ with Parser(TESTFILE) as log:
+ self.assertIsNotNone(next(log))
+
+ def test_trace_log_parsed(self):
+ sys.stdin = TRACER_LOG_DATA
+ with Parser('-') as log:
+ event = next(log)
+ self.assertEquals(len(event), 10)
--- /dev/null
+import re
+
+class Structure(object):
+ '''Gst Structure parser.'''
+
+ def __init__(self, text):
+ self.text = text
+ self.name = None
+ self.types = {}
+ self.values = {}
+ self.pos = 0
+ self.valid = False
+ try:
+ self._parse(self.text)
+ self.valid = True
+ except ValueError:
+ pass
+
+ def __repr__(self):
+ return self.text
+
+ def _parse(self, s):
+ scan = True
+ # parse id
+ p = s.find(',')
+ if p == -1:
+ p = s.index(';')
+ scan = False
+ self.name = s[:p]
+ s = s[(p + 2):] # skip 'name, '
+ self.pos += p + 2
+ # parse fields
+ while scan:
+ p = s.index('=')
+ k = s[:p]
+ s = s[(p + 1):] # skip 'key='
+ self.pos += p + 1
+ p = s.index('(')
+ s = s[(p + 1):] # skip '('
+ self.pos += p + 1
+ p = s.index(')')
+ t = s[:p]
+ s = s[(p + 1):] # skip 'type)'
+ self.pos += p + 1
+ if t == 'structure':
+ p = s.index('"')
+ s = s[(p + 1):] # skip '"'
+ self.pos += p + 1
+ # find next '"' without preceeding '\'
+ sub = s
+ sublen = 0
+ while True:
+ p = sub.index('"')
+ sublen += p + 1
+ if sub[p - 1] != '\\':
+ sub = None
+ break;
+ sub = sub[(p + 1):]
+ if not sub:
+ sub = s[:(sublen - 1)]
+ # unescape \., but not \\. (using a backref)
+ # FIXME: try to combine
+ # also:
+ # unescape = re.compile('search')
+ # unescape.sub('replacement', sub)
+ sub = re.sub(r'\\\\', r'\\', sub)
+ sub = re.sub(r'(?<!\\)\\(.)', r'\1', sub)
+ sub = re.sub(r'(?<!\\)\\(.)', r'\1', sub)
+ # recurse
+ v = Structure(sub)
+ if s[sublen] == ';':
+ scan = False
+ s = s[(sublen + 2):]
+ self.pos += sublen + 2
+ else:
+ raise ValueError
+ else:
+ p = s.find(',')
+ if p == -1:
+ p = s.index(';')
+ scan = False
+ v= s[:p]
+ s = s[(p + 2):] # skip "value, "
+ self.pos += p + 2
+ if t == 'string' and v[0] == '"':
+ v = v[1:-1]
+ elif t in ['int', 'uint', 'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64' ]:
+ v = int(v)
+ self.types[k] = t
+ self.values[k] = v
+
+ self.valid = True
--- /dev/null
+import unittest
+
+from tracer.structure import Structure
+
+BAD_NAME = r'foo bar'
+BAD_KEY = r'foo, bar'
+BAD_TYPE1 = r'foo, bar=['
+BAD_TYPE2 = r'foo, bar=(int'
+
+EMPTY_STRUCTURE = r'foo;'
+
+SINGLE_VALUE_STRUCTURE = r'foo, key=(string)"value";'
+MISC_TYPES_STRUCTURE = r'foo, key1=(string)"value", key2=(int)5, key3=(boolean)true;'
+
+NESTED_STRUCTURE = r'foo, nested=(structure)"bar\,\ key1\=\(int\)0\,\ key2\=\(int\)5\;";'
+
+class TestStructure(unittest.TestCase):
+
+ def test_handles_bad_name(self):
+ structure = Structure(BAD_NAME)
+ self.assertFalse(structure.valid)
+ self.assertEquals(structure.pos, 0)
+
+ def test_handles_bad_key(self):
+ structure = Structure(BAD_KEY)
+ self.assertFalse(structure.valid)
+ self.assertEquals(structure.pos, 5)
+
+ def test_handles_bad_type1(self):
+ structure = Structure(BAD_TYPE1)
+ self.assertFalse(structure.valid)
+ self.assertEquals(structure.pos, 9)
+
+ def test_handles_bad_type2(self):
+ structure = Structure(BAD_TYPE2)
+ self.assertFalse(structure.valid)
+ self.assertEquals(structure.pos, 10)
+
+ def test_parses_empty_structure(self):
+ structure = Structure(EMPTY_STRUCTURE)
+ self.assertTrue(structure.valid)
+
+ def test_parses_name_in_empty_structure(self):
+ structure = Structure(EMPTY_STRUCTURE)
+ self.assertEquals(structure.name, 'foo')
+
+ def test_parses_single_value_structure(self):
+ structure = Structure(SINGLE_VALUE_STRUCTURE)
+ self.assertTrue(structure.valid)
+
+ def test_parses_name(self):
+ structure = Structure(SINGLE_VALUE_STRUCTURE)
+ self.assertEquals(structure.name, 'foo')
+
+ def test_parses_key(self):
+ structure = Structure(SINGLE_VALUE_STRUCTURE)
+ self.assertIn('key', structure.types)
+ self.assertIn('key', structure.values)
+
+ def test_parses_type(self):
+ structure = Structure(SINGLE_VALUE_STRUCTURE)
+ self.assertEquals(structure.types['key'], 'string')
+
+ def test_parses_string_value(self):
+ structure = Structure(MISC_TYPES_STRUCTURE)
+ self.assertEquals(structure.values['key1'], 'value')
+
+ def test_parses_int_value(self):
+ structure = Structure(MISC_TYPES_STRUCTURE)
+ self.assertEquals(structure.values['key2'], 5)
+
+ def test_parses_nested_structure(self):
+ structure = Structure(NESTED_STRUCTURE)
+ self.assertTrue(structure.valid)
+ sub = structure.values['nested']
+ self.assertTrue(sub.valid)
+
+ def test_nested_structure_has_sub_structure(self):
+ structure = Structure(NESTED_STRUCTURE)
+ self.assertEquals(structure.types['nested'], 'structure')
+ self.assertIsInstance(structure.values['nested'], Structure)