2 # Copyright 2020 The Pigweed Authors
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 # use this file except in compliance with the License. You may obtain a copy of
8 # https://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations under
15 """Tests for detokenize."""
21 from pathlib import Path
25 from unittest import mock
27 from pw_tokenizer import database
28 from pw_tokenizer import detokenize
29 from pw_tokenizer import elf_reader
30 from pw_tokenizer import tokens
33 # This function is not part of this test. It was used to generate the binary
34 # strings for EMPTY_ELF and ELF_WITH_TOKENIZER_SECTIONS. It takes a path and
35 # returns a Python byte string suitable for copying into Python source code.
36 def path_to_byte_string(path):
37 with open(path, 'rb') as fd:
41 indices = iter(range(len(data)))
52 line += repr(data[i:i + 1])[2:-1].replace("'", r'\'')
55 return ''.join(output)
57 output.append(" b'{}'\n".format(''.join(line)))
60 # This is an empty ELF file. It was created from the ELF file for
61 # tokenize_test.cc with the command:
63 # arm-none-eabi-objcopy -S --only-section NO_SECTIONS_PLEASE <ELF> <OUTPUT>
65 # The resulting ELF was converted to a Python binary string using
66 # path_to_byte_string function above.
68 b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01'
69 b'\x00\x00\x00\xd1\x83\x00\x084\x00\x00\x00\xe0\x00\x00\x00\x00\x04\x00\x05'
70 b'4\x00 \x00\x05\x00(\x00\x02\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00'
71 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00'
72 b'\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00'
73 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00'
74 b'\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
75 b'\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00'
76 b'\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
77 b'\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00'
78 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
79 b'\x06\x00\x00\x00\x00\x00\x01\x00\x00.shstrtab\x00\x00\x00\x00\x00\x00\x00'
80 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
81 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01'
82 b'\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xd4\x00\x00'
83 b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00'
86 # This is an ELF file with only the pw_tokenizer sections. It was created
87 # from a tokenize_test binary built for the STM32F429i Discovery board. The
88 # pw_tokenizer sections were extracted with this command:
90 # arm-none-eabi-objcopy -S --only-section ".pw_tokenizer*" <ELF> <OUTPUT>
92 ELF_WITH_TOKENIZER_SECTIONS = Path(__file__).parent.joinpath(
93 'example_binary_with_tokenized_strings.elf').read_bytes()
97 # 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py).
98 JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e'
101 class DetokenizeTest(unittest.TestCase):
102 """Tests the detokenize.Detokenizer."""
103 def test_simple(self):
104 detok = detokenize.Detokenizer(
106 tokens.TokenizedStringEntry(0xcdab,
108 date_removed=dt.datetime.now())
110 self.assertEqual(str(detok.detokenize(b'\xab\xcd\0\0\x02\x03Two\x66')),
113 def test_detokenize_extra_data_is_unsuccessful(self):
114 detok = detokenize.Detokenizer(
116 tokens.TokenizedStringEntry(1,
118 date_removed=dt.datetime(1, 1, 1))
121 result = detok.detokenize(b'\x01\0\0\0\x04args')
122 self.assertEqual(len(result.failures), 1)
123 string, args, remaining = result.failures[0]
124 self.assertEqual('no args', string)
125 self.assertFalse(args)
126 self.assertEqual(b'\x04args', remaining)
127 self.assertEqual('no args', string)
128 self.assertEqual('no args', str(result))
130 def test_detokenize_missing_data_is_unsuccessful(self):
131 detok = detokenize.Detokenizer(
133 tokens.TokenizedStringEntry(2,
135 date_removed=dt.datetime(1, 1, 1))
138 result = detok.detokenize(b'\x02\0\0\0')
139 string, args, remaining = result.failures[0]
140 self.assertEqual('%s', string)
141 self.assertEqual(len(args), 1)
142 self.assertEqual(b'', remaining)
143 self.assertEqual(len(result.failures), 1)
144 self.assertEqual('%s', str(result))
146 def test_detokenize_missing_data_with_errors_is_unsuccessful(self):
147 detok = detokenize.Detokenizer(tokens.Database([
148 tokens.TokenizedStringEntry(2,
150 date_removed=dt.datetime(1, 1, 1))
154 result = detok.detokenize(b'\x02\0\0\0')
155 string, args, remaining = result.failures[0]
156 self.assertIn('%s MISSING', string)
157 self.assertEqual(len(args), 1)
158 self.assertEqual(b'', remaining)
159 self.assertEqual(len(result.failures), 1)
160 self.assertIn('%s MISSING', str(result))
162 def test_unparsed_data(self):
163 detok = detokenize.Detokenizer(
165 tokens.TokenizedStringEntry(1,
167 date_removed=dt.datetime(
170 result = detok.detokenize(b'\x01\0\0\0o_o')
171 self.assertFalse(result.ok())
172 self.assertEqual('no args', str(result))
173 self.assertIn('o_o', repr(result))
174 self.assertIn('decoding failed', result.error_message())
176 def test_empty_db(self):
177 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
178 self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
179 self.assertIn('unknown token',
180 detok.detokenize(b'1234').error_message())
181 self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
182 self.assertEqual('', str(detok.detokenize(b'1234')))
184 self.assertIsNone(detok.detokenize(b'').token)
186 def test_empty_db_show_errors(self):
187 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
188 self.assertFalse(detok.detokenize(b'\x12\x34\0\0').ok())
189 self.assertIn('unknown token',
190 detok.detokenize(b'1234').error_message())
191 self.assertIn('unknown token', repr(detok.detokenize(b'1234')))
192 self.assertIn('unknown token', str(detok.detokenize(b'1234')))
194 self.assertIsNone(detok.detokenize(b'').token)
196 def test_missing_token_show_errors(self):
197 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF), show_errors=True)
198 self.assertIn('missing token', detok.detokenize(b'').error_message())
199 self.assertIn('missing token', str(detok.detokenize(b'')))
200 self.assertIn('missing token', repr(detok.detokenize(b'123')))
202 self.assertIn('missing token', detok.detokenize(b'1').error_message())
203 self.assertIn('missing token', str(detok.detokenize(b'1')))
204 self.assertIn('missing token', repr(detok.detokenize(b'1')))
206 self.assertIn('missing token',
207 detok.detokenize(b'123').error_message())
208 self.assertIn('missing token', str(detok.detokenize(b'123')))
209 self.assertIn('missing token', repr(detok.detokenize(b'123')))
211 def test_missing_token(self):
212 detok = detokenize.Detokenizer(io.BytesIO(EMPTY_ELF))
213 self.assertIn('missing token', detok.detokenize(b'').error_message())
214 self.assertEqual('', str(detok.detokenize(b'')))
215 self.assertIn('missing token', repr(detok.detokenize(b'123')))
217 self.assertIn('missing token', detok.detokenize(b'1').error_message())
218 self.assertEqual('', str(detok.detokenize(b'1')))
219 self.assertIn('missing token', repr(detok.detokenize(b'1')))
221 self.assertIn('missing token',
222 detok.detokenize(b'123').error_message())
223 self.assertEqual('', str(detok.detokenize(b'123')))
224 self.assertIn('missing token', repr(detok.detokenize(b'123')))
226 def test_decode_from_elf_data(self):
227 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
229 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
230 self.assertEqual(str(detok.detokenize(JELLO_WORLD_TOKEN)),
233 undecoded_args = detok.detokenize(JELLO_WORLD_TOKEN + b'some junk')
234 self.assertFalse(undecoded_args.ok())
235 self.assertEqual(str(undecoded_args), 'Jello, world!')
237 self.assertTrue(detok.detokenize(b'\0\0\0\0').ok())
238 self.assertEqual(str(detok.detokenize(b'\0\0\0\0')), '')
240 def test_decode_from_elf_file(self):
241 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
242 expected_tokens = frozenset(detok.database.token_to_entries.keys())
244 elf = tempfile.NamedTemporaryFile('wb', delete=False)
246 elf.write(ELF_WITH_TOKENIZER_SECTIONS)
249 # Open ELF by file object
250 with open(elf.name, 'rb') as fd:
251 detok = detokenize.Detokenizer(fd)
253 self.assertEqual(expected_tokens,
254 frozenset(detok.database.token_to_entries.keys()))
257 detok = detokenize.Detokenizer(elf.name)
258 self.assertEqual(expected_tokens,
259 frozenset(detok.database.token_to_entries.keys()))
261 # Open ELF by elf_reader.Elf
262 with open(elf.name, 'rb') as fd:
263 detok = detokenize.Detokenizer(elf_reader.Elf(fd))
265 self.assertEqual(expected_tokens,
266 frozenset(detok.database.token_to_entries.keys()))
270 def test_decode_from_csv_file(self):
271 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
272 expected_tokens = frozenset(detok.database.token_to_entries.keys())
274 csv_database = str(detok.database)
275 self.assertEqual(len(csv_database.splitlines()), TOKENS_IN_ELF)
277 csv_file = tempfile.NamedTemporaryFile('w', delete=False)
279 csv_file.write(csv_database)
283 detok = detokenize.Detokenizer(csv_file.name)
284 self.assertEqual(expected_tokens,
285 frozenset(detok.database.token_to_entries.keys()))
287 # Open CSV by file object
288 with open(csv_file.name) as fd:
289 detok = detokenize.Detokenizer(fd)
291 self.assertEqual(expected_tokens,
292 frozenset(detok.database.token_to_entries.keys()))
294 os.unlink(csv_file.name)
296 def test_create_detokenizer_with_token_database(self):
297 detok = detokenize.Detokenizer(io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
298 expected_tokens = frozenset(detok.database.token_to_entries.keys())
300 detok = detokenize.Detokenizer(detok.database)
301 self.assertEqual(expected_tokens,
302 frozenset(detok.database.token_to_entries.keys()))
305 class DetokenizeWithCollisions(unittest.TestCase):
306 """Tests collision resolution."""
311 # Database with several conflicting tokens.
312 self.detok = detokenize.Detokenizer(tokens.Database([
313 tokens.TokenizedStringEntry(
314 token, 'REMOVED', date_removed=dt.datetime(9, 1, 1)),
315 tokens.TokenizedStringEntry(token, 'newer'),
316 tokens.TokenizedStringEntry(
317 token, 'A: %d', date_removed=dt.datetime(30, 5, 9)),
318 tokens.TokenizedStringEntry(
319 token, 'B: %c', date_removed=dt.datetime(30, 5, 10)),
320 tokens.TokenizedStringEntry(token, 'C: %s'),
321 tokens.TokenizedStringEntry(token, '%d%u'),
322 tokens.TokenizedStringEntry(token, '%s%u %d'),
323 tokens.TokenizedStringEntry(1, '%s'),
324 tokens.TokenizedStringEntry(1, '%d'),
325 tokens.TokenizedStringEntry(2, 'Three %s %s %s'),
326 tokens.TokenizedStringEntry(2, 'Five %d %d %d %d %s'),
329 def test_collision_no_args_favors_most_recently_present(self):
330 no_args = self.detok.detokenize(b'\xad\xba\0\0')
331 self.assertFalse(no_args.ok())
332 self.assertEqual(len(no_args.successes), 2)
333 self.assertEqual(len(no_args.failures), 5)
334 self.assertEqual(len(no_args.matches()), 7)
335 self.assertEqual(str(no_args), 'newer')
336 self.assertEqual(len(no_args.best_result()[1]), 0)
337 self.assertEqual(no_args.best_result()[0], 'newer')
339 def test_collision_one_integer_arg_favors_most_recently_present(self):
340 multiple_correct = self.detok.detokenize(b'\xad\xba\0\0\x7a')
341 self.assertFalse(multiple_correct.ok())
342 self.assertIn('ERROR', repr(multiple_correct))
343 self.assertEqual(len(multiple_correct.successes), 2)
344 self.assertEqual(len(multiple_correct.failures), 5)
345 self.assertEqual(len(multiple_correct.matches()), 7)
346 self.assertEqual(str(multiple_correct), 'B: =')
348 def test_collision_one_integer_arg_favor_successful_decode(self):
349 # One string decodes successfully, since the arg is out of range for %c.
350 int_arg = self.detok.detokenize(b'\xad\xba\0\0\xfe\xff\xff\xff\x0f')
351 self.assertTrue(int_arg.ok())
352 self.assertEqual(str(int_arg), 'A: 2147483647')
354 def test_collision_one_string_arg_favors_successful_decode(self):
355 # One string decodes successfully, since decoding the argument as an
356 # integer does not decode all the data.
357 string_arg = self.detok.detokenize(b'\xad\xba\0\0\x02Hi')
358 self.assertTrue(string_arg.ok())
359 self.assertEqual(str(string_arg), 'C: Hi')
361 def test_collision_one_string_arg_favors_decoding_all_data(self):
362 result = self.detok.detokenize(b'\1\0\0\0\x83hi')
363 self.assertEqual(len(result.failures), 2)
364 # Should resolve to the string since %d would leave one byte behind.
365 self.assertEqual(str(result), '%s')
367 def test_collision_multiple_args_favors_decoding_more_arguments(self):
368 result = self.detok.detokenize(b'\2\0\0\0\1\2\1\4\5')
369 self.assertEqual(len(result.matches()), 2)
370 self.assertEqual(result.matches()[0][0], 'Five -1 1 -1 2 %s')
371 self.assertEqual(result.matches()[1][0], 'Three \2 \4 %s')
373 def test_collision_multiple_args_favors_decoding_all_arguments(self):
374 unambiguous = self.detok.detokenize(b'\xad\xba\0\0\x01#\x00\x01')
375 self.assertTrue(unambiguous.ok())
376 self.assertEqual(len(unambiguous.matches()), 7)
377 self.assertEqual('#0 -1', str(unambiguous))
378 self.assertIn('#0 -1', repr(unambiguous))
381 @mock.patch('os.path.getmtime')
382 class AutoUpdatingDetokenizerTest(unittest.TestCase):
383 """Tests the AutoUpdatingDetokenizer class."""
384 def test_update(self, mock_getmtime):
385 """Tests the update command."""
387 db = database.load_token_database(
388 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
389 self.assertEqual(len(db), TOKENS_IN_ELF)
393 def move_back_time_if_file_exists(path):
394 if os.path.exists(path):
398 raise FileNotFoundError
400 mock_getmtime.side_effect = move_back_time_if_file_exists
402 file = tempfile.NamedTemporaryFile('wb', delete=False)
406 detok = detokenize.AutoUpdatingDetokenizer(file.name,
408 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
410 with open(file.name, 'wb') as fd:
411 tokens.write_binary(db, fd)
413 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
417 # The database stays around if the file is deleted.
418 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
420 def test_no_update_if_time_is_same(self, mock_getmtime):
421 mock_getmtime.return_value = 100
423 file = tempfile.NamedTemporaryFile('wb', delete=False)
426 database.load_token_database(
427 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS)), file)
430 detok = detokenize.AutoUpdatingDetokenizer(file,
432 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
434 # Empty the database, but keep the mock modified time the same.
435 with open(file.name, 'wb'):
438 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
439 self.assertTrue(detok.detokenize(JELLO_WORLD_TOKEN).ok())
441 # Move back time so the now-empty file is reloaded.
442 mock_getmtime.return_value = 50
443 self.assertFalse(detok.detokenize(JELLO_WORLD_TOKEN).ok())
448 def _next_char(message: bytes) -> bytes:
449 return bytes(b + 1 for b in message)
452 class PrefixedMessageDecoderTest(unittest.TestCase):
455 self.decode = detokenize.PrefixedMessageDecoder('$', 'abcdefg')
457 def test_transform_single_message(self):
460 b''.join(self.decode.transform(io.BytesIO(b'$abcd'), _next_char)))
462 def test_transform_message_amidst_other_only_affects_message(self):
464 b'%%WHAT?%bcd%WHY? is this %ok %', b''.join(
465 self.decode.transform(
466 io.BytesIO(b'$$WHAT?$abc$WHY? is this $ok $'),
469 def test_transform_empty_message(self):
472 b''.join(self.decode.transform(io.BytesIO(b'$1$'), _next_char)))
474 def test_transform_sequential_messages(self):
476 b'%bcd%efghh', b''.join(
477 self.decode.transform(io.BytesIO(b'$abc$defgh'), _next_char)))
480 class DetokenizeBase64(unittest.TestCase):
481 """Tests detokenizing Base64 messages."""
483 JELLO = b'$' + base64.b64encode(JELLO_WORLD_TOKEN)
485 RECURSION_STRING = f'The secret message is "{JELLO.decode()}"'
486 RECURSION = b'$' + base64.b64encode(
487 struct.pack('I', tokens.default_hash(RECURSION_STRING)))
489 RECURSION_STRING_2 = f"'{RECURSION.decode()}', said the spy."
490 RECURSION_2 = b'$' + base64.b64encode(
491 struct.pack('I', tokens.default_hash(RECURSION_STRING_2)))
495 (b'nothing here', b'nothing here'),
496 (JELLO, b'Jello, world!'),
497 (JELLO + b'a', b'Jello, world!a'),
498 (JELLO + b'abc', b'Jello, world!abc'),
499 (JELLO + b'abc=', b'Jello, world!abc='),
500 (b'$a' + JELLO + b'a', b'$aJello, world!a'),
501 (b'Hello ' + JELLO + b'?', b'Hello Jello, world!?'),
502 (b'$' + JELLO, b'$Jello, world!'),
503 (JELLO + JELLO, b'Jello, world!Jello, world!'),
504 (JELLO + b'$' + JELLO, b'Jello, world!$Jello, world!'),
505 (JELLO + b'$a' + JELLO + b'bcd', b'Jello, world!$aJello, world!bcd'),
506 (b'$3141', b'$3141'),
507 (JELLO + b'$3141', b'Jello, world!$3141'),
508 (RECURSION, b'The secret message is "Jello, world!"'),
510 b'\'The secret message is "Jello, world!"\', said the spy.'),
515 db = database.load_token_database(
516 io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
518 tokens.TokenizedStringEntry(tokens.default_hash(s), s)
519 for s in [self.RECURSION_STRING, self.RECURSION_STRING_2])
520 self.detok = detokenize.Detokenizer(db)
522 def test_detokenize_base64_live(self):
523 for data, expected in self.TEST_CASES:
524 output = io.BytesIO()
525 detokenize.detokenize_base64_live(self.detok, io.BytesIO(data),
528 self.assertEqual(expected, output.getvalue())
530 def test_detokenize_base64_to_file(self):
531 for data, expected in self.TEST_CASES:
532 output = io.BytesIO()
533 detokenize.detokenize_base64_to_file(self.detok, data, output, '$')
535 self.assertEqual(expected, output.getvalue())
537 def test_detokenize_base64(self):
538 for data, expected in self.TEST_CASES:
540 expected, detokenize.detokenize_base64(self.detok, data, b'$'))
543 class DetokenizeBase64InfiniteRecursion(unittest.TestCase):
544 """Tests that infinite Bas64 token recursion resolves."""
547 self.detok = detokenize.Detokenizer(
549 tokens.TokenizedStringEntry(0, '$AAAAAA=='), # token for 0
550 tokens.TokenizedStringEntry(1, '$AgAAAA=='), # token for 2
551 tokens.TokenizedStringEntry(2, '$AwAAAA=='), # token for 3
552 tokens.TokenizedStringEntry(3, '$AgAAAA=='), # token for 2
555 def test_detokenize_self_recursion(self):
556 for depth in range(5):
558 detokenize.detokenize_base64(self.detok,
559 b'This one is deep: $AAAAAA==',
561 b'This one is deep: $AAAAAA==')
563 def test_detokenize_self_recursion_default(self):
565 detokenize.detokenize_base64(self.detok,
566 b'This one is deep: $AAAAAA=='),
567 b'This one is deep: $AAAAAA==')
569 def test_detokenize_cyclic_recursion_even(self):
571 detokenize.detokenize_base64(self.detok,
572 b'I said "$AQAAAA=="',
573 recursion=2), b'I said "$AgAAAA=="')
575 def test_detokenize_cyclic_recursion_odd(self):
577 detokenize.detokenize_base64(self.detok,
578 b'I said "$AQAAAA=="',
579 recursion=3), b'I said "$AwAAAA=="')
582 if __name__ == '__main__':