tracer: add new python library to process tracer logs
authorStefan Sauer <ensonic@users.sf.net>
Mon, 12 Dec 2016 21:38:57 +0000 (22:38 +0100)
committerStefan Sauer <ensonic@users.sf.net>
Tue, 20 Dec 2016 07:24:57 +0000 (08:24 +0100)
This is the beginning of a python library for wrting tools that process tracer
logs. This library contains a structure parser written in python to avoid the
dependency on gobject introspection (and the slowness and non pythoness that
comes with it).

tracer/Makefile [new file with mode: 0644]
tracer/README [new file with mode: 0644]
tracer/tracer/analyzer.py [new file with mode: 0644]
tracer/tracer/analyzer_test.py [new file with mode: 0644]
tracer/tracer/parser.py [new file with mode: 0644]
tracer/tracer/parser_test.py [new file with mode: 0644]
tracer/tracer/structure.py [new file with mode: 0644]
tracer/tracer/structure_test.py [new file with mode: 0644]

diff --git a/tracer/Makefile b/tracer/Makefile
new file mode 100644 (file)
index 0000000..06e2f08
--- /dev/null
@@ -0,0 +1,12 @@
+TEST_DATA = \
+  logs/trace.latency.log
+
+all:
+
+logs/trace.latency.log:
+       mkdir -p logs; \
+       GST_DEBUG="GST_TRACER:7" GST_TRACERS=latency GST_DEBUG_FILE=$@ \
+         gst-launch-1.0 -q audiotestsrc num-buffers=10 wave=silence ! audioconvert ! autoaudiosink
+
+check: $(TEST_DATA)
+       python3 -m unittest discover tracer
diff --git a/tracer/README b/tracer/README
new file mode 100644 (file)
index 0000000..f4c0e5d
--- /dev/null
@@ -0,0 +1,78 @@
+# Add a python api for tracer analyzers
+
+The python framework will parse the tracer log and aggregate information.
+the tool writer will subclass from the Analyzer class and override method like:
+
+  'pad_push_buffer_pre'
+
+There is one method for each hook. Each of those methods will receive the parse
+log line. In addition the framework will offer some extra api to allow to e.g.
+write:
+
+  pad.name()             # pad name
+  pad.parent().name()    # element name
+  pad.peer().parent()    # peer element
+  pad.parent().state()   # element state
+
+If we don't have full loging, we'd like to print a warning once, but make this
+non fatal if possible. E.g. if we don't have logging for
+element_{add,remove}_pad, we might not be able to provide pad.parent().
+
+A tool can replay the log multiple times. If it does, it won't work in
+'streaming' mode though. Streaming mode can offer live stats.
+
+## TODO
+Do we want to provide classes like GstBin, GstElement, GstPad, ... to aggregate
+info. We'd need to also provide a way to e.g. add a GstLogAnalyzer that knows
+about data from the log tracer and populates the classes. We need to be able to
+build a pipeline of analyzers, e.g. the analyzer calls GstLogAnalzer in its
+catch-all handler and then processes some events individually.
+
+Parse the tracer classes. Add helper that for numeric values extract them, and
+aggregate min/max/avg. Consider other statistical information (std. deviation)
+and provide a rolling average for live view.
+
+Think of how analyzer classes can be combined:
+- we'd like to build tools that import other analyzer classes and chain the
+  processing.
+
+## Examples
+### Sequence chart generator (mscgen)
+
+1.) Write file header
+
+2.) collect element order
+Replay the log and use pad_link_pre to collect pad->peer_pad relationship.
+Build a sequence of element names and write to msc file.
+
+3.) collect event processing
+Replay the log and use pad_push_event_pre to output message lines to mscfile.
+
+4.) write footer and run the tool.
+
+## Latency stats
+
+1.) collect per sink-latencies and for each sink per source latencies
+Calculate min, max, avg. Consider streaming interface, where we update the stats
+e.g. once a sec
+
+2.) in non-streaming mode write final statistic
+
+## cpu load stats
+
+Like latency stats, for cpu load. Process cpu load + per thread cpu load.
+
+## top
+
+Combine various stats tools into one.
+
+# Improve tracers
+## log
+* the log tracer logs args and results into misc categories
+* issues
+  * not easy/reliable to detect its output among other trace output
+  * not easy to match pre/post lines
+  * uses own do_log method, instead of gst_tracer_record_log
+    * if we also log structures, we need to log the 'function' as the
+      structure-name, also fields would be key=(type)val, instead of key=value
+    * if we switch to gst_tracer_record_log, we'd need to register 27 formats :/
diff --git a/tracer/tracer/analyzer.py b/tracer/tracer/analyzer.py
new file mode 100644 (file)
index 0000000..f9c4b92
--- /dev/null
@@ -0,0 +1,116 @@
+import re
+import sys
+
+from tracer.parser import Parser
+from tracer.structure import Structure
+
+_SCOPE_RELATED_TO = {
+    'GST_TRACER_VALUE_SCOPE_PAD': 'Pad',
+    'GST_TRACER_VALUE_SCOPE_ELEMENT': 'Element',
+    'GST_TRACER_VALUE_SCOPE_THREAD': 'Thread',
+    'GST_TRACER_VALUE_SCOPE_PROCESS': 'Process',
+}
+
+_NUMERIC_TYPES = ('int', 'uint', 'gint', 'guint', 'gint64', 'guint64')
+
+class Analyzer(object):
+    '''Base class for a gst tracer analyzer.'''
+
+    def __init__(self, log):
+        self.log = log
+        self.records = {}
+        self.data = {}
+
+    def handle_tracer_class(self, event):
+        s = Structure(event[Parser.F_MESSAGE])
+        # TODO only for debugging
+        #print("tracer class:", repr(s))
+        name = s.name[:-len('.class')]
+        record = {
+            'class': s,
+            'scope' : {},
+            'value' : {},
+        }
+        self.records[name] = record
+        for k,v in s.values.items():
+            if v.name == 'scope':
+                # TODO only for debugging
+                #print("scope: [%s]=%s" % (k, v))
+                record['scope'][k] = v
+            elif v.name == 'value':
+                # skip non numeric and those without min/max
+                if (v.values['type'] in _NUMERIC_TYPES and
+                      'min' in v.values and 'max' in v.values):
+                    # TODO only for debugging
+                    #print("value: [%s]=%s" % (k, v))
+                    record['value'][k] = v
+                #else:
+                    # TODO only for debugging
+                    #print("skipping value: [%s]=%s" % (k, v))
+
+
+    def handle_tracer_entry(self, event):
+        # use first field in message (structure-id) if none
+        if event[Parser.F_FUNCTION]:
+            # TODO: parse params in event[Parser.F_MESSAGE]
+            vmethod_name = event[Parser.F_FUNCTION]
+        else:
+            s = Structure(event[Parser.F_MESSAGE])
+            vmethod_name = s.name
+            record = self.records.get(vmethod_name)
+            if record:
+                # aggregate event based on class
+                for sk,sv in record['scope'].items():
+                    # look up bin by scope (or create new)
+                    key = (_SCOPE_RELATED_TO[sv.values['related-to']] +
+                        ":" + str(s.values[sk]))
+                    scope = self.data.get(key)
+                    if not scope:
+                        scope = {}
+                        self.data[key] = scope
+                    for vk,vv in record['value'].items():
+                        key = vmethod_name + "/" + vk
+                        data = scope.get(key)
+                        if not data:
+                            data = {
+                                'num': 0,
+                                'sum': 0,
+                            }
+                            if 'max' in vv.values and 'min' in vv.values:
+                                data['min'] = int(vv.values['max'])
+                                data['max'] = int(vv.values['min'])
+                            scope[key] = data
+                        # update min/max/sum and count via value
+                        dv = int(s.values[vk])
+                        data['num'] += 1
+                        data['sum'] += dv
+                        if 'min' in data:
+                            data['min'] = min(dv, data['min'])
+                        if 'max' in data:
+                            data['max'] = max(dv, data['max'])
+
+        # TODO: check if self has a catch-all handler and call first (check this in init)
+        # - we can use this to chain filters, allthough the chained filter
+        #   would be doing the same as below
+        # check if self['vmethod'] is a function, if so call
+        vmethod = getattr (self, vmethod_name, None)
+        if callable(vmethod):
+            vmethod (event)
+
+    def is_tracer_class(self, event):
+        return (event[Parser.F_FILENAME] == 'gsttracerrecord.c' and
+                    event[Parser.F_CATEGORY] == 'GST_TRACER' and
+                    '.class' in event[Parser.F_MESSAGE])
+
+    def is_tracer_entry(self, event):
+        return (not event[Parser.F_LINE] and not event[Parser.F_FILENAME])
+
+    def run(self):
+        for event in self.log:
+            # check if it is a tracer.class or tracer event
+            if self.is_tracer_class(event):
+                self.handle_tracer_class(event)
+            elif self.is_tracer_entry(event):
+                self.handle_tracer_entry(event)
+            #else:
+            #    print("unhandled:", repr(event))
diff --git a/tracer/tracer/analyzer_test.py b/tracer/tracer/analyzer_test.py
new file mode 100644 (file)
index 0000000..fac00ee
--- /dev/null
@@ -0,0 +1,18 @@
+import unittest
+
+from tracer.analyzer import Analyzer
+
+TRACER_CLASS = ('0:00:00.036373170', 1788, '0x23bca70', 'TRACE', 'GST_TRACER', 'gsttracerrecord.c', 110, 'gst_tracer_record_build_format', None, r'latency.class, src=(structure)"scope\\,\\ type\\=\\(type\\)gchararray\\,\\ related-to\\=\\(GstTracerValueScope\\)GST_TRACER_VALUE_SCOPE_PAD\\;", sink=(structure)"scope\\,\\ type\\=\\(type\\)gchararray\\,\\ related-to\\=\\(GstTracerValueScope\\)GST_TRACER_VALUE_SCOPE_PAD\\;", time=(structure)"value\\,\\ type\\=\\(type\\)guint64\\,\\ description\\=\\(string\\)\\"time\\\\\\ it\\\\\\ took\\\\\\ for\\\\\\ the\\\\\\ buffer\\\\\\ to\\\\\\ go\\\\\\ from\\\\\\ src\\\\\\ to\\\\\\ sink\\\\\\ ns\\"\\,\\ flags\\=\\(GstTracerValueFlags\\)GST_TRACER_VALUE_FLAGS_AGGREGATED\\,\\ min\\=\\(guint64\\)0\\,\\ max\\=\\(guint64\\)18446744073709551615\\;";')
+
+TRACER_ENTRY = ('0:00:00.142391137', 1788, '0x7f8a201056d0', 'TRACE', 'GST_TRACER', '', 0, '', None, r'latency, src=(string)source_src, sink=(string)pulsesink0_sink, time=(guint64)47091349;')
+
+class TestAnalyzer(unittest.TestCase):
+
+    def test_detect_tracer_class(self):
+        a = Analyzer(None)
+        self.assertTrue(a.is_tracer_class(TRACER_CLASS))
+
+    def test_detect_tracer_entry(self):
+        a = Analyzer(None)
+        self.assertTrue(a.is_tracer_entry(TRACER_ENTRY))
+
diff --git a/tracer/tracer/parser.py b/tracer/tracer/parser.py
new file mode 100644 (file)
index 0000000..eed54cd
--- /dev/null
@@ -0,0 +1,86 @@
+import os
+import re
+import sys
+
+# new tracer class
+# 0:00:00.041536066  1788      0x14b2150 TRACE             GST_TRACER gsttracerrecord.c:110:gst_tracer_record_build_format: latency.class, src=(structure)"scope\,\ type\=\(GType\)NULL\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", sink=(structure)"scope\,\ type\=\(GType\)NULL\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", time=(structure)"value\,\ type\=\(GType\)NULL\,\ description\=\(string\)\"time\\\ it\\\ took\\\ for\\\ the\\\ buffer\\\ to\\\ go\\\ from\\\ src\\\ to\\\ sink\\\ ns\"\,\ flags\=\(GstTracerValueFlags\)GST_TRACER_VALUE_FLAGS_AGGREGATED\,\ min\=\(guint64\)0\,\ max\=\(guint64\)18446744073709551615\;";
+
+# tracer log entry
+# 0:00:00.079422574  7664      0x238ac70 TRACE             GST_TRACER :0:: thread-rusage, thread-id=(guint64)37268592, ts=(guint64)79416000, average-cpuload=(uint)1000, current-cpuload=(uint)1000, time=(guint64)79418045;
+
+# from log tracer
+# 0:00:00.460486001 18356      0x21de780 TRACE       GST_ELEMENT_PADS :0:do_element_add_pad:<GstBaseSink@0x429e880> 0:00:00.460483603, key=val, ...
+def _log_line_regex():
+
+    # "0:00:00.777913000  "
+    TIME = r"(\d+:\d\d:\d\d\.\d+)\s+"
+    # "DEBUG             "
+    LEVEL = "([A-Z]+)\s+"
+    # "0x8165430 "
+    THREAD = r"(0x[0-9a-f]+)\s+"
+      # "GST_REFCOUNTING ", "flacdec "
+    CATEGORY = "([A-Za-z0-9_-]+)\s+"
+    # "  3089 "
+    PID = r"(\d+)\s*"
+    FILENAME = r"([^:]*):"
+    LINE = r"(\d+):"
+    FUNCTION = r"([A-Za-z0-9_]*):"
+    # FIXME: When non-g(st)object stuff is logged with *_OBJECT (like
+    # buffers!), the address is printed *without* <> brackets!
+    OBJECT = "(?:<([^>]+)>)?"
+    MESSAGE = "(.+)"
+
+    ANSI = "(?:\x1b\\[[0-9;]*m\\s*)*\\s*"
+
+    return [TIME, ANSI, PID, ANSI, THREAD, ANSI, LEVEL, ANSI, CATEGORY,
+            FILENAME, LINE, FUNCTION, ANSI, OBJECT, ANSI, MESSAGE]
+
+
+class Parser(object):
+    '''Helper to parse a tracer log'''
+
+    # record fields
+    F_TIME = 0
+    F_PID = 1
+    F_THREAD = 2
+    F_LEVEL = 3
+    F_CATEGORY = 4
+    F_FILENAME = 5
+    F_LINE = 6
+    F_FUNCTION = 7
+    F_OBJECT = 8
+    F_MESSAGE = 9
+
+    def __init__(self, filename):
+        self.filename = filename
+        self.log_regex = re.compile(''.join(_log_line_regex()))
+        self.file = None
+
+    def __enter__(self):
+        def __is_tracer(line):
+            return 'TRACE' in line
+
+        if self.filename != '-':
+            self.file = open(self.filename, 'rt')
+        else:
+            self.file = sys.stdin
+        self.data = filter(__is_tracer, self.file)
+        return self
+
+    def __exit__(self, *args):
+        if self.filename != '-':
+            self.file.close()
+            self.file = None
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        while True:
+            line = next(self.data)
+            match = self.log_regex.match(line)
+            if match:
+                g = list(match.groups())
+                g[Parser.F_PID] = int(g[Parser.F_PID])
+                g[Parser.F_LINE] = int(g[Parser.F_LINE])
+                return g
diff --git a/tracer/tracer/parser_test.py b/tracer/tracer/parser_test.py
new file mode 100644 (file)
index 0000000..0a3f6c1
--- /dev/null
@@ -0,0 +1,46 @@
+import sys
+import unittest
+
+from tracer.parser import Parser
+
+TESTFILE = './logs/trace.latency.log'
+
+TEXT_DATA = ['first line', 'second line']
+
+TRACER_LOG_DATA = [
+    '0:00:00.079422574  7664      0x238ac70 TRACE             GST_TRACER :0:: thread-rusage, thread-id=(guint64)37268592, ts=(guint64)79416000, average-cpuload=(uint)1000, current-cpuload=(uint)1000, time=(guint64)79418045;'
+]
+TRACER_CLASS_LOG_DATA = [
+    '0:00:00.041536066  1788      0x14b2150 TRACE             GST_TRACER gsttracerrecord.c:110:gst_tracer_record_build_format: latency.class, src=(structure)"scope\,\ type\=\(type\)gchararray\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", sink=(structure)"scope\,\ type\=\(type\)gchararray\,\ related-to\=\(GstTracerValueScope\)GST_TRACER_VALUE_SCOPE_PAD\;", time=(structure)"value\,\ type\=\(type\)guint64\,\ description\=\(string\)\"time\\\ it\\\ took\\\ for\\\ the\\\ buffer\\\ to\\\ go\\\ from\\\ src\\\ to\\\ sink\\\ ns\"\,\ flags\=\(GstTracerValueFlags\)GST_TRACER_VALUE_FLAGS_AGGREGATED\,\ min\=\(guint64\)0\,\ max\=\(guint64\)18446744073709551615\;";'
+]
+
+class TestParser(unittest.TestCase):
+
+    def test___init__(self):
+        log = Parser(TESTFILE)
+        self.assertIsNone(log.file)
+
+    def test___enter___with_file(self):
+        with Parser(TESTFILE) as log:
+            self.assertIsNotNone(log.file)
+
+    def test___enter___with_stdin(self):
+        sys.stdin = TEXT_DATA
+        with Parser('-') as log:
+            self.assertIsNotNone(log.file)
+
+    def test_random_text_reports_none(self):
+        sys.stdin = TEXT_DATA
+        with Parser('-') as log:
+            with self.assertRaises(StopIteration):
+                next(log)
+
+    def test_log_file_reports_trace_log(self):
+        with Parser(TESTFILE) as log:
+            self.assertIsNotNone(next(log))
+
+    def test_trace_log_parsed(self):
+        sys.stdin = TRACER_LOG_DATA
+        with Parser('-') as log:
+            event = next(log)
+            self.assertEquals(len(event), 10)
diff --git a/tracer/tracer/structure.py b/tracer/tracer/structure.py
new file mode 100644 (file)
index 0000000..f41e2e6
--- /dev/null
@@ -0,0 +1,92 @@
+import re
+
+class Structure(object):
+    '''Gst Structure parser.'''
+
+    def __init__(self, text):
+        self.text = text
+        self.name = None
+        self.types = {}
+        self.values = {}
+        self.pos = 0
+        self.valid = False
+        try:
+            self._parse(self.text)
+            self.valid = True
+        except ValueError:
+            pass
+
+    def __repr__(self):
+        return self.text
+
+    def _parse(self, s):
+        scan = True
+        # parse id
+        p = s.find(',')
+        if p == -1:
+            p = s.index(';')
+            scan = False
+        self.name = s[:p]
+        s = s[(p + 2):]  # skip 'name, '
+        self.pos += p + 2
+        # parse fields
+        while scan:
+            p = s.index('=')
+            k = s[:p]
+            s = s[(p + 1):]  # skip 'key='
+            self.pos += p + 1
+            p = s.index('(')
+            s = s[(p + 1):]  # skip '('
+            self.pos += p + 1
+            p = s.index(')')
+            t = s[:p]
+            s = s[(p + 1):]  # skip 'type)'
+            self.pos += p + 1
+            if t == 'structure':
+                p = s.index('"')
+                s = s[(p + 1):]  # skip '"'
+                self.pos += p + 1
+                # find next '"' without preceeding '\'
+                sub = s
+                sublen = 0
+                while True:
+                    p = sub.index('"')
+                    sublen += p + 1
+                    if sub[p - 1] != '\\':
+                        sub = None
+                        break;
+                    sub = sub[(p + 1):]
+                if not sub:
+                    sub = s[:(sublen - 1)]
+                    # unescape \., but not \\. (using a backref)
+                    # FIXME: try to combine
+                    # also:
+                    # unescape = re.compile('search')
+                    # unescape.sub('replacement', sub)
+                    sub = re.sub(r'\\\\', r'\\', sub)
+                    sub = re.sub(r'(?<!\\)\\(.)', r'\1', sub)
+                    sub = re.sub(r'(?<!\\)\\(.)', r'\1', sub)
+                    # recurse
+                    v = Structure(sub)
+                    if s[sublen] == ';':
+                        scan = False
+                    s = s[(sublen + 2):]
+                    self.pos += sublen + 2
+                else:
+                    raise ValueError
+            else:
+                p = s.find(',')
+                if p == -1:
+                    p = s.index(';')
+                    scan = False
+                v= s[:p]
+                s = s[(p + 2):]  # skip "value, "
+                self.pos += p + 2
+                if t == 'string' and v[0] == '"':
+                    v = v[1:-1]
+                elif t in ['int', 'uint', 'int8', 'uint8', 'int16', 'uint16', 'int32', 'uint32', 'int64', 'uint64' ]:
+                    v = int(v)
+            self.types[k] = t
+            self.values[k] = v
+
+        self.valid = True
diff --git a/tracer/tracer/structure_test.py b/tracer/tracer/structure_test.py
new file mode 100644 (file)
index 0000000..9db6afe
--- /dev/null
@@ -0,0 +1,81 @@
+import unittest
+
+from tracer.structure import Structure
+
+BAD_NAME = r'foo bar'
+BAD_KEY = r'foo, bar'
+BAD_TYPE1 = r'foo, bar=['
+BAD_TYPE2 = r'foo, bar=(int'
+
+EMPTY_STRUCTURE = r'foo;'
+
+SINGLE_VALUE_STRUCTURE = r'foo, key=(string)"value";'
+MISC_TYPES_STRUCTURE = r'foo, key1=(string)"value", key2=(int)5, key3=(boolean)true;'
+
+NESTED_STRUCTURE = r'foo, nested=(structure)"bar\,\ key1\=\(int\)0\,\ key2\=\(int\)5\;";'
+
+class TestStructure(unittest.TestCase):
+
+    def test_handles_bad_name(self):
+        structure = Structure(BAD_NAME)
+        self.assertFalse(structure.valid)
+        self.assertEquals(structure.pos, 0)
+
+    def test_handles_bad_key(self):
+        structure = Structure(BAD_KEY)
+        self.assertFalse(structure.valid)
+        self.assertEquals(structure.pos, 5)
+
+    def test_handles_bad_type1(self):
+        structure = Structure(BAD_TYPE1)
+        self.assertFalse(structure.valid)
+        self.assertEquals(structure.pos, 9)
+
+    def test_handles_bad_type2(self):
+        structure = Structure(BAD_TYPE2)
+        self.assertFalse(structure.valid)
+        self.assertEquals(structure.pos, 10)
+
+    def test_parses_empty_structure(self):
+        structure = Structure(EMPTY_STRUCTURE)
+        self.assertTrue(structure.valid)
+
+    def test_parses_name_in_empty_structure(self):
+        structure = Structure(EMPTY_STRUCTURE)
+        self.assertEquals(structure.name, 'foo')
+
+    def test_parses_single_value_structure(self):
+        structure = Structure(SINGLE_VALUE_STRUCTURE)
+        self.assertTrue(structure.valid)
+
+    def test_parses_name(self):
+        structure = Structure(SINGLE_VALUE_STRUCTURE)
+        self.assertEquals(structure.name, 'foo')
+
+    def test_parses_key(self):
+        structure = Structure(SINGLE_VALUE_STRUCTURE)
+        self.assertIn('key', structure.types)
+        self.assertIn('key', structure.values)
+
+    def test_parses_type(self):
+        structure = Structure(SINGLE_VALUE_STRUCTURE)
+        self.assertEquals(structure.types['key'], 'string')
+
+    def test_parses_string_value(self):
+        structure = Structure(MISC_TYPES_STRUCTURE)
+        self.assertEquals(structure.values['key1'], 'value')
+
+    def test_parses_int_value(self):
+        structure = Structure(MISC_TYPES_STRUCTURE)
+        self.assertEquals(structure.values['key2'], 5)
+
+    def test_parses_nested_structure(self):
+        structure = Structure(NESTED_STRUCTURE)
+        self.assertTrue(structure.valid)
+        sub = structure.values['nested']
+        self.assertTrue(sub.valid)
+
+    def test_nested_structure_has_sub_structure(self):
+        structure = Structure(NESTED_STRUCTURE)
+        self.assertEquals(structure.types['nested'], 'structure')
+        self.assertIsInstance(structure.values['nested'], Structure)