2 # Copyright 2020 The Pigweed Authors
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 # use this file except in compliance with the License. You may obtain a copy of
8 # https://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations under
15 r"""Decodes and detokenizes strings from binary or Base64 input.
17 The main class provided by this module is the Detokenize class. To use it,
18 construct it with the path to an ELF or CSV database, a tokens.Database,
19 or a file object for an ELF file or CSV. Then, call the detokenize method with
20 encoded messages, one at a time. The detokenize method returns a
21 DetokenizedString object with the result.
25 from pw_tokenizer import detokenize
27 detok = detokenize.Detokenizer('path/to/my/image.elf')
28 print(detok.detokenize(b'\x12\x34\x56\x78\x03hi!'))
30 This module also provides a command line interface for decoding and detokenizing
31 messages from a file or stdin.
37 from datetime import datetime
41 from pathlib import Path
47 from typing import (BinaryIO, Callable, Dict, List, Iterable, Iterator, Match,
48 NamedTuple, Optional, Pattern, Tuple, Union)
51 from pw_tokenizer import database, decode, tokens
53 # Append this path to the module search path to allow running this module
54 # without installing the pw_tokenizer package.
55 sys.path.append(os.path.dirname(os.path.dirname(
56 os.path.abspath(__file__))))
57 from pw_tokenizer import database, decode, tokens
59 ENCODED_TOKEN = struct.Struct('<I')
60 _LOG = logging.getLogger('pw_tokenizer')
63 class DetokenizedString:
64 """A detokenized string, with all results if there are collisions."""
67 format_string_entries: Iterable[tuple],
68 encoded_message: bytes,
69 show_errors: bool = False):
71 self.encoded_message = encoded_message
72 self._show_errors = show_errors
74 self.successes: List[decode.FormattedString] = []
75 self.failures: List[decode.FormattedString] = []
77 decode_attempts: List[Tuple[Tuple, decode.FormattedString]] = []
79 for entry, fmt in format_string_entries:
80 result = fmt.format(encoded_message[ENCODED_TOKEN.size:],
83 # Sort competing entries so the most likely matches appear first.
84 # Decoded strings are prioritized by whether they
86 # 1. decoded all bytes for all arguments without errors,
87 # 2. decoded all data,
88 # 3. have the fewest decoding errors,
89 # 4. decoded the most arguments successfully, or
90 # 5. have the most recent removal date, if they were removed.
92 # This must match the collision resolution logic in detokenize.cc.
94 all(arg.ok() for arg in result.args) and not result.remaining,
95 not result.remaining, # decoded all data
96 -sum(not arg.ok() for arg in result.args), # fewest errors
97 len(result.args), # decoded the most arguments
98 entry.date_removed or datetime.max) # most recently present
100 decode_attempts.append((score, result))
102 # Sort the attempts by the score so the most likely results are first.
103 decode_attempts.sort(key=lambda value: value[0], reverse=True)
105 # Split out the successesful decodes from the failures.
106 for score, result in decode_attempts:
108 self.successes.append(result)
110 self.failures.append(result)
112 def ok(self) -> bool:
113 """True if exactly one string decoded the arguments successfully."""
114 return len(self.successes) == 1
116 def matches(self) -> List[decode.FormattedString]:
117 """Returns the strings that matched the token, best matches first."""
118 return self.successes + self.failures
120 def best_result(self) -> Optional[decode.FormattedString]:
121 """Returns the string and args for the most likely decoded string."""
122 for string_and_args in self.matches():
123 return string_and_args
127 def error_message(self) -> str:
128 """If detokenization failed, returns a descriptive message."""
132 if not self.matches():
133 if self.token is None:
134 return 'missing token'
136 return 'unknown token {:08x}'.format(self.token)
138 if len(self.matches()) == 1:
139 return 'decoding failed for {!r}'.format(self.matches()[0].value)
141 return '{} matches'.format(len(self.matches()))
143 def __str__(self) -> str:
144 """Returns the string for the most likely result."""
145 result = self.best_result()
149 if self._show_errors:
150 return '<[ERROR: {}|{!r}]>'.format(self.error_message(),
151 self.encoded_message)
154 def __repr__(self) -> str:
156 message = repr(str(self))
158 message = 'ERROR: {}|{!r}'.format(self.error_message(),
159 self.encoded_message)
161 return '{}({})'.format(type(self).__name__, message)
164 class _TokenizedFormatString(NamedTuple):
165 entry: tokens.TokenizedStringEntry
166 format: decode.FormatString
170 """Main detokenization class; detokenizes strings and caches results."""
171 def __init__(self, *token_database_or_elf, show_errors: bool = False):
172 """Decodes and detokenizes binary messages.
175 *token_database_or_elf: a path or file object for an ELF or CSV
176 database, a tokens.Database, or an elf_reader.Elf
177 show_errors: if True, an error message is used in place of the %
178 conversion specifier when an argument fails to decode
180 self.database = database.load_token_database(*token_database_or_elf)
181 self.show_errors = show_errors
183 # Cache FormatStrings for faster lookup & formatting.
184 self._cache: Dict[int, List[_TokenizedFormatString]] = {}
186 def lookup(self, token: int) -> List[_TokenizedFormatString]:
187 """Returns (TokenizedStringEntry, FormatString) list for matches."""
189 return self._cache[token]
192 _TokenizedFormatString(entry, decode.FormatString(str(entry)))
193 for entry in self.database.token_to_entries[token]
195 self._cache[token] = format_strings
196 return format_strings
198 def detokenize(self, encoded_message: bytes) -> DetokenizedString:
199 """Decodes and detokenizes a message as a DetokenizedString."""
200 if len(encoded_message) < ENCODED_TOKEN.size:
201 return DetokenizedString(None, (), encoded_message,
204 token, = ENCODED_TOKEN.unpack_from(encoded_message)
205 return DetokenizedString(token, self.lookup(token), encoded_message,
209 class AutoUpdatingDetokenizer:
210 """Loads and updates a detokenizer from database paths."""
212 """Tracks the modified time of a path or file object."""
213 def __init__(self, path):
214 self.path = path if isinstance(path, (str, Path)) else path.name
215 self._modified_time: Optional[float] = self._last_modified_time()
217 def updated(self) -> bool:
218 """True if the path has been updated since the last call."""
219 modified_time = self._last_modified_time()
220 if modified_time is None or modified_time == self._modified_time:
223 self._modified_time = modified_time
226 def _last_modified_time(self) -> Optional[float]:
228 return os.path.getmtime(self.path)
229 except FileNotFoundError:
232 def load(self) -> tokens.Database:
234 return database.load_token_database(self.path)
235 except FileNotFoundError:
236 return database.load_token_database()
240 min_poll_period_s: float = 1.0) -> None:
241 self.paths = tuple(self._DatabasePath(path) for path in paths_or_files)
242 self.min_poll_period_s = min_poll_period_s
243 self._last_checked_time: float = time.time()
244 self._detokenizer = Detokenizer(*(path.load() for path in self.paths))
246 def detokenize(self, data: bytes) -> DetokenizedString:
247 """Updates the token database if it has changed, then detokenizes."""
248 if time.time() - self._last_checked_time >= self.min_poll_period_s:
249 self._last_checked_time = time.time()
251 if any(path.updated() for path in self.paths):
252 _LOG.info('Changes detected; reloading token database')
253 self._detokenizer = Detokenizer(*(path.load()
254 for path in self.paths))
256 return self._detokenizer.detokenize(data)
259 _Detokenizer = Union[Detokenizer, AutoUpdatingDetokenizer]
262 class PrefixedMessageDecoder:
263 """Parses messages that start with a prefix character from a byte stream."""
264 def __init__(self, prefix: Union[str, bytes], chars: Union[str, bytes]):
265 """Parses prefixed messages.
268 prefix: one character that signifies the start of a message
269 chars: characters allowed in a message
271 self._prefix = prefix.encode() if isinstance(prefix, str) else prefix
273 if isinstance(chars, str):
274 chars = chars.encode()
276 # Store the valid message bytes as a set of binary strings.
277 self._message_bytes = frozenset(chars[i:i + 1]
278 for i in range(len(chars)))
280 if len(self._prefix) != 1 or self._prefix in self._message_bytes:
282 'Invalid prefix {!r}: the prefix must be a single '
283 'character that is not a valid message character.'.format(
286 self.data = bytearray()
288 def _read_next(self, fd: BinaryIO) -> Tuple[bytes, int]:
289 """Returns the next character and its index."""
291 index = len(self.data)
295 def read_messages(self,
296 binary_fd: BinaryIO) -> Iterator[Tuple[bool, bytes]]:
297 """Parses prefixed messages; yields (is_message, contents) chunks."""
301 # This reads the file character-by-character. Non-message characters
302 # are yielded right away; message characters are grouped.
303 char, index = self._read_next(binary_fd)
305 # If in a message, keep reading until the message completes.
306 if message_start is not None:
307 if char in self._message_bytes:
310 yield True, self.data[message_start:index]
313 # Handle a non-message character.
317 if char == self._prefix:
318 message_start = index
322 def transform(self, binary_fd: BinaryIO,
323 transform: Callable[[bytes], bytes]) -> Iterator[bytes]:
324 """Yields the file with a transformation applied to the messages."""
325 for is_message, chunk in self.read_messages(binary_fd):
326 yield transform(chunk) if is_message else chunk
329 def _detokenize_prefixed_base64(
330 detokenizer: _Detokenizer, prefix: bytes,
331 recursion: int) -> Callable[[Match[bytes]], bytes]:
332 """Returns a function that decodes prefixed Base64 with the detokenizer."""
333 def decode_and_detokenize(match: Match[bytes]) -> bytes:
334 """Decodes prefixed base64 with the provided detokenizer."""
335 original = match.group(0)
338 detokenized_string = detokenizer.detokenize(
339 base64.b64decode(original[1:], validate=True))
340 if detokenized_string.matches():
341 result = str(detokenized_string).encode()
343 if recursion > 0 and original != result:
344 result = detokenize_base64(detokenizer, result, prefix,
348 except binascii.Error:
353 return decode_and_detokenize
357 DEFAULT_RECURSION = 9
360 def _base64_message_regex(prefix: bytes) -> Pattern[bytes]:
361 """Returns a regular expression for prefixed base64 tokenized strings."""
363 # Base64 tokenized strings start with the prefix character ($)
364 re.escape(prefix) + (
365 # Tokenized strings contain 0 or more blocks of four Base64 chars.
366 br'(?:[A-Za-z0-9+/\-_]{4})*'
367 # The last block of 4 chars may have one or two padding chars (=).
368 br'(?:[A-Za-z0-9+/\-_]{3}=|[A-Za-z0-9+/\-_]{2}==)?'))
371 def detokenize_base64_live(detokenizer: _Detokenizer,
372 input_file: BinaryIO,
374 prefix: Union[str, bytes] = BASE64_PREFIX,
375 recursion: int = DEFAULT_RECURSION) -> None:
376 """Reads chars one-at-a-time and decodes messages; SLOW for big files."""
377 prefix_bytes = prefix.encode() if isinstance(prefix, str) else prefix
379 base64_message = _base64_message_regex(prefix_bytes)
381 def transform(data: bytes) -> bytes:
382 return base64_message.sub(
383 _detokenize_prefixed_base64(detokenizer, prefix_bytes, recursion),
386 for message in PrefixedMessageDecoder(
387 prefix, string.ascii_letters + string.digits + '+/-_=').transform(
388 input_file, transform):
389 output.write(message)
391 # Flush each line to prevent delays when piping between processes.
396 def detokenize_base64_to_file(detokenizer: _Detokenizer,
399 prefix: Union[str, bytes] = BASE64_PREFIX,
400 recursion: int = DEFAULT_RECURSION) -> None:
401 """Decodes prefixed Base64 messages in data; decodes to an output file."""
402 prefix = prefix.encode() if isinstance(prefix, str) else prefix
404 _base64_message_regex(prefix).sub(
405 _detokenize_prefixed_base64(detokenizer, prefix, recursion), data))
408 def detokenize_base64(detokenizer: _Detokenizer,
410 prefix: Union[str, bytes] = BASE64_PREFIX,
411 recursion: int = DEFAULT_RECURSION) -> bytes:
412 """Decodes and replaces prefixed Base64 messages in the provided data.
415 detokenizer: the detokenizer with which to decode messages
416 data: the binary data to decode
417 prefix: one-character byte string that signals the start of a message
418 recursion: how many levels to recursively decode
421 copy of the data with all recognized tokens decoded
423 output = io.BytesIO()
424 detokenize_base64_to_file(detokenizer, data, output, prefix, recursion)
425 return output.getvalue()
428 def _follow_and_detokenize_file(detokenizer: _Detokenizer,
431 prefix: Union[str, bytes],
432 poll_period_s: float = 0.01) -> None:
433 """Polls a file to detokenize it and any appended data."""
439 detokenize_base64_to_file(detokenizer, data, output, prefix)
442 time.sleep(poll_period_s)
443 except KeyboardInterrupt:
447 def _handle_base64(databases, input_file: BinaryIO, output: BinaryIO,
448 prefix: str, show_errors: bool, follow: bool) -> None:
449 """Handles the base64 command line option."""
450 # argparse.FileType doesn't correctly handle - for binary files.
451 if input_file is sys.stdin:
452 input_file = sys.stdin.buffer
454 if output is sys.stdout:
455 output = sys.stdout.buffer
457 detokenizer = Detokenizer(tokens.Database.merged(*databases),
458 show_errors=show_errors)
461 _follow_and_detokenize_file(detokenizer, input_file, output, prefix)
462 elif input_file.seekable():
463 # Process seekable files all at once, which is MUCH faster.
464 detokenize_base64_to_file(detokenizer, input_file.read(), output,
467 # For non-seekable inputs (e.g. pipes), read one character at a time.
468 detokenize_base64_live(detokenizer, input_file, output, prefix)
471 def _parse_args() -> argparse.Namespace:
472 """Parses and return command line arguments."""
474 parser = argparse.ArgumentParser(
476 formatter_class=argparse.RawDescriptionHelpFormatter)
477 parser.set_defaults(handler=lambda **_: parser.print_help())
479 subparsers = parser.add_subparsers(help='Encoding of the input.')
481 base64_help = 'Detokenize Base64-encoded data from a file or stdin.'
482 subparser = subparsers.add_parser(
484 description=base64_help,
485 parents=[database.token_databases_parser()],
487 subparser.set_defaults(handler=_handle_base64)
488 subparser.add_argument(
492 type=argparse.FileType('rb'),
493 default=sys.stdin.buffer,
494 help='The file from which to read; provide - or omit for stdin.')
495 subparser.add_argument(
499 help=('Detokenize data appended to input_file as it grows; similar to '
501 subparser.add_argument('-o',
503 type=argparse.FileType('wb'),
504 default=sys.stdout.buffer,
505 help=('The file to which to write the output; '
506 'provide - or omit for stdout.'))
507 subparser.add_argument(
510 default=BASE64_PREFIX,
511 help=('The one-character prefix that signals the start of a '
512 'Base64-encoded message. (default: $)'))
513 subparser.add_argument(
517 help=('Show error messages instead of conversion specifiers when '
518 'arguments cannot be decoded.'))
520 return parser.parse_args()
526 handler = args.handler
529 handler(**vars(args))
533 if __name__ == '__main__':
534 if sys.version_info[0] < 3:
535 sys.exit('ERROR: The detokenizer command line tools require Python 3.')