Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_trace_tokenized / py / pw_trace_tokenized / trace_tokenized.py
1 #!/usr/bin/env python3
2 # Copyright 2020 The Pigweed Authors
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 # use this file except in compliance with the License. You may obtain a copy of
6 # the License at
7 #
8 #     https://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations under
14 # the License.
15 r"""
16 Generates json trace files viewable using chrome://tracing from binary
17 trace files.
18
19 Example usage:
20 python pw_trace_tokenized/py/trace_tokenized.py -i trace.bin -o trace.json
21 ./out/host_clang_debug/obj/pw_trace_tokenized/bin/trace_tokenized_example_basic
22 """
23 from enum import IntEnum
24 import argparse
25 import logging
26 import struct
27 import sys
28 from pw_tokenizer import database, tokens
29 from pw_trace import trace
30
31 _LOG = logging.getLogger('pw_trace_tokenizer')
32
33
34 def varint_decode(encoded):
35     # Taken from pw_tokenizer.decode._decode_signed_integer
36     count = 0
37     result = 0
38     shift = 0
39     for byte in encoded:
40         count += 1
41         result |= (byte & 0x7f) << shift
42         if not byte & 0x80:
43             return result, count
44
45         shift += 7
46         if shift >= 64:
47             break  # Error
48     return None
49
50
51 # Token string: "event_type|flag|module|group|label|<optional data_fmt>"
52 class TokenIdx(IntEnum):
53     EventType = 0
54     Flag = 1
55     Module = 2
56     Group = 3
57     Label = 4
58     data_fmt = 5  # optional
59
60
61 def get_trace_type(type_str):
62     if type_str == "PW_TRACE_EVENT_TYPE_INSTANT":
63         return trace.TraceType.Instantaneous
64     if type_str == "PW_TRACE_EVENT_TYPE_INSTANT_GROUP":
65         return trace.TraceType.InstantaneousGroup
66     if type_str == "PW_TRACE_EVENT_TYPE_ASYNC_START":
67         return trace.TraceType.AsyncStart
68     if type_str == "PW_TRACE_EVENT_TYPE_ASYNC_STEP":
69         return trace.TraceType.AsyncStep
70     if type_str == "PW_TRACE_EVENT_TYPE_ASYNC_END":
71         return trace.TraceType.AsyncEnd
72     if type_str == "PW_TRACE_EVENT_TYPE_DURATION_START":
73         return trace.TraceType.DurationStart
74     if type_str == "PW_TRACE_EVENT_TYPE_DURATION_END":
75         return trace.TraceType.DurationEnd
76     if type_str == "PW_TRACE_EVENT_TYPE_DURATION_GROUP_START":
77         return trace.TraceType.DurationGroupStart
78     if type_str == "PW_TRACE_EVENT_TYPE_DURATION_GROUP_END":
79         return trace.TraceType.DurationGroupEnd
80     return trace.TraceType.Invalid
81
82
83 def has_trace_id(token_string):
84     token_values = token_string.split("|")
85     return trace.event_has_trace_id(token_values[TokenIdx.EventType])
86
87
88 def has_data(token_string):
89     token_values = token_string.split("|")
90     return len(token_values) > TokenIdx.data_fmt
91
92
93 def create_trace_event(token_string, timestamp_us, trace_id, data):
94     token_values = token_string.split("|")
95     return trace.TraceEvent(event_type=get_trace_type(
96         token_values[TokenIdx.EventType]),
97                             module=token_values[TokenIdx.Module],
98                             label=token_values[TokenIdx.Label],
99                             timestamp_us=timestamp_us,
100                             group=token_values[TokenIdx.Group],
101                             trace_id=trace_id,
102                             flags=token_values[TokenIdx.Flag],
103                             has_data=has_data(token_string),
104                             data_fmt=(token_values[TokenIdx.data_fmt]
105                                       if has_data(token_string) else ""),
106                             data=data if has_data(token_string) else b'')
107
108
109 def parse_trace_event(buffer, db, last_time, ticks_per_second=1000):
110     us_per_tick = 1000000 / ticks_per_second
111     idx = 0
112     # Read token
113     token = struct.unpack('I', buffer[idx:idx + 4])[0]
114     idx += 4
115
116     # Decode token
117     if len(db.token_to_entries[token]) == 0:
118         _LOG.error("token not found: %08x", token)
119     token_string = str(db.token_to_entries[token][0])
120
121     # Read time
122     time_delta, time_bytes = varint_decode(buffer[idx:])
123     timestamp_us = last_time + us_per_tick * time_delta
124     idx += time_bytes
125
126     # Trace ID
127     trace_id = None
128     if has_trace_id(token_string) and idx < len(buffer):
129         trace_id, trace_id_bytes = varint_decode(buffer[idx:])
130         idx += trace_id_bytes
131
132     # Data
133     data = None
134     if has_data(token_string) and idx < len(buffer):
135         data = buffer[idx:]
136
137     # Create trace event
138     return create_trace_event(token_string, timestamp_us, trace_id, data)
139
140
141 def get_trace_events_from_file(databases, input_file_name):
142     """Handles the decoding traces."""
143
144     db = tokens.Database.merged(*databases)
145     last_timestamp = 0
146     events = []
147     with open(input_file_name, "rb") as input_file:
148         bytes_read = input_file.read()
149         idx = 0
150
151         while idx + 1 < len(bytes_read):
152             # Read size
153             size = int(bytes_read[idx])
154             if idx + size > len(bytes_read):
155                 _LOG.error("incomplete file")
156                 break
157
158             event = parse_trace_event(bytes_read[idx + 1:idx + 1 + size], db,
159                                       last_timestamp)
160             last_timestamp = event.timestamp_us
161             events.append(event)
162             idx = idx + size + 1
163     return events
164
165
166 def _parse_args():
167     """Parse and return command line arguments."""
168
169     parser = argparse.ArgumentParser(
170         description=__doc__,
171         formatter_class=argparse.RawDescriptionHelpFormatter)
172     parser.add_argument(
173         'databases',
174         nargs='+',
175         action=database.LoadTokenDatabases,
176         help='Databases (ELF, binary, or CSV) to use to lookup tokens.')
177     parser.add_argument(
178         '-i',
179         '--input',
180         dest='input_file',
181         help='The binary trace input file, generated using trace_to_file.h.')
182     parser.add_argument('-o',
183                         '--output',
184                         dest='output_file',
185                         help=('The json file to which to write the output.'))
186
187     return parser.parse_args()
188
189
190 def _main(args):
191     events = get_trace_events_from_file(args.databases, args.input_file)
192     json_lines = trace.generate_trace_json(events)
193
194     with open(args.output_file, 'w') as output_file:
195         for line in json_lines:
196             output_file.write("%s,\n" % line)
197
198
199 if __name__ == '__main__':
200     if sys.version_info[0] < 3:
201         sys.exit('ERROR: The detokenizer command line tools require Python 3.')
202     _main(_parse_args())