Fix for x86_64 build fail
[platform/upstream/connectedhomeip.git] / third_party / pigweed / repo / pw_tokenizer / py / pw_tokenizer / database.py
1 #!/usr/bin/env python3
2 # Copyright 2020 The Pigweed Authors
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 # use this file except in compliance with the License. You may obtain a copy of
6 # the License at
7 #
8 #     https://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # License for the specific language governing permissions and limitations under
14 # the License.
15 """Creates and manages token databases.
16
17 This module manages reading tokenized strings from ELF files and building and
18 maintaining token databases.
19 """
20
21 import argparse
22 from datetime import datetime
23 import glob
24 import json
25 import logging
26 import os
27 from pathlib import Path
28 import re
29 import struct
30 import sys
31 from typing import (Any, Callable, Dict, Iterable, Iterator, List, Pattern,
32                     Set, TextIO, Tuple, Union)
33
34 try:
35     from pw_tokenizer import elf_reader, tokens
36 except ImportError:
37     # Append this path to the module search path to allow running this module
38     # without installing the pw_tokenizer package.
39     sys.path.append(os.path.dirname(os.path.dirname(
40         os.path.abspath(__file__))))
41     from pw_tokenizer import elf_reader, tokens
42
43 _LOG = logging.getLogger('pw_tokenizer')
44
45
46 def _elf_reader(elf) -> elf_reader.Elf:
47     return elf if isinstance(elf, elf_reader.Elf) else elf_reader.Elf(elf)
48
49
50 # Magic number used to indicate the beginning of a tokenized string entry. This
51 # value MUST match the value of _PW_TOKENIZER_ENTRY_MAGIC in
52 # pw_tokenizer/public/pw_tokenizer/internal/tokenize_string.h.
53 _TOKENIZED_ENTRY_MAGIC = 0xBAA98DEE
54 _ENTRY = struct.Struct('<4I')
55 _TOKENIZED_ENTRY_SECTIONS = re.compile(
56     r'^\.pw_tokenizer.entries(?:\.[_\d]+)?$')
57
58 _LEGACY_STRING_SECTIONS = re.compile(
59     r'^\.pw_tokenized\.(?P<domain>[^.]+)(?:\.\d+)?$')
60
61 _ERROR_HANDLER = 'surrogateescape'  # How to deal with UTF-8 decoding errors
62
63
64 class Error(Exception):
65     """Failed to extract token entries from an ELF file."""
66
67
68 def _read_tokenized_entries(
69         data: bytes,
70         domain: Pattern[str]) -> Iterator[tokens.TokenizedStringEntry]:
71     index = 0
72
73     while index + _ENTRY.size <= len(data):
74         magic, token, domain_len, string_len = _ENTRY.unpack_from(data, index)
75
76         if magic != _TOKENIZED_ENTRY_MAGIC:
77             raise Error(
78                 f'Expected magic number 0x{_TOKENIZED_ENTRY_MAGIC:08x}, '
79                 f'found 0x{magic:08x}')
80
81         start = index + _ENTRY.size
82         index = start + domain_len + string_len
83
84         # Create the entries, trimming null terminators.
85         entry = tokens.TokenizedStringEntry(
86             token,
87             data[start + domain_len:index - 1].decode(errors=_ERROR_HANDLER),
88             data[start:start + domain_len - 1].decode(errors=_ERROR_HANDLER),
89         )
90
91         if data[start + domain_len - 1] != 0:
92             raise Error(
93                 f'Domain {entry.domain} for {entry.string} not null terminated'
94             )
95
96         if data[index - 1] != 0:
97             raise Error(f'String {entry.string} is not null terminated')
98
99         if domain.fullmatch(entry.domain):
100             yield entry
101
102
103 def _read_tokenized_strings(sections: Dict[str, bytes],
104                             domain: Pattern[str]) -> Iterator[tokens.Database]:
105     # Legacy ELF files used "default" as the default domain instead of "". Remap
106     # the default if necessary.
107     if domain.pattern == tokens.DEFAULT_DOMAIN:
108         domain = re.compile('default')
109
110     for section, data in sections.items():
111         match = _LEGACY_STRING_SECTIONS.match(section)
112         if match and domain.match(match.group('domain')):
113             yield tokens.Database.from_strings(
114                 (s.decode(errors=_ERROR_HANDLER) for s in data.split(b'\0')),
115                 match.group('domain'))
116
117
118 def _database_from_elf(elf, domain: Pattern[str]) -> tokens.Database:
119     """Reads the tokenized strings from an elf_reader.Elf or ELF file object."""
120     _LOG.debug('Reading tokenized strings in domain "%s" from %s', domain, elf)
121
122     reader = _elf_reader(elf)
123
124     # Read tokenized string entries.
125     section_data = reader.dump_section_contents(_TOKENIZED_ENTRY_SECTIONS)
126     if section_data is not None:
127         return tokens.Database(_read_tokenized_entries(section_data, domain))
128
129     # Read legacy null-terminated string entries.
130     sections = reader.dump_sections(_LEGACY_STRING_SECTIONS)
131     if sections:
132         return tokens.Database.merged(
133             *_read_tokenized_strings(sections, domain))
134
135     return tokens.Database([])
136
137
138 def tokenization_domains(elf) -> Iterator[str]:
139     """Lists all tokenization domains in an ELF file."""
140     reader = _elf_reader(elf)
141     section_data = reader.dump_section_contents(_TOKENIZED_ENTRY_SECTIONS)
142     if section_data is not None:
143         yield from frozenset(
144             e.domain
145             for e in _read_tokenized_entries(section_data, re.compile('.*')))
146     else:  # Check for the legacy domain sections
147         for section in reader.sections:
148             match = _LEGACY_STRING_SECTIONS.match(section.name)
149             if match:
150                 yield match.group('domain')
151
152
153 def read_tokenizer_metadata(elf) -> Dict[str, int]:
154     """Reads the metadata entries from an ELF."""
155     sections = _elf_reader(elf).dump_section_contents(r'\.pw_tokenizer\.info')
156
157     metadata: Dict[str, int] = {}
158     if sections is not None:
159         for key, value in struct.iter_unpack('12sI', sections):
160             try:
161                 metadata[key.rstrip(b'\0').decode()] = value
162             except UnicodeDecodeError as err:
163                 _LOG.error('Failed to decode metadata key %r: %s',
164                            key.rstrip(b'\0'), err)
165
166     return metadata
167
168
169 def _load_token_database(db, domain: Pattern[str]) -> tokens.Database:
170     """Loads a Database from a database object, ELF, CSV, or binary database."""
171     if db is None:
172         return tokens.Database()
173
174     if isinstance(db, tokens.Database):
175         return db
176
177     if isinstance(db, elf_reader.Elf):
178         return _database_from_elf(db, domain)
179
180     # If it's a str, it might be a path. Check if it's an ELF or CSV.
181     if isinstance(db, (str, Path)):
182         if not os.path.exists(db):
183             raise FileNotFoundError(
184                 f'"{db}" is not a path to a token database')
185
186         # Read the path as an ELF file.
187         with open(db, 'rb') as fd:
188             if elf_reader.compatible_file(fd):
189                 return _database_from_elf(fd, domain)
190
191         # Read the path as a packed binary or CSV file.
192         return tokens.DatabaseFile(db)
193
194     # Assume that it's a file object and check if it's an ELF.
195     if elf_reader.compatible_file(db):
196         return _database_from_elf(db, domain)
197
198     # Read the database as CSV or packed binary from a file object's path.
199     if hasattr(db, 'name') and os.path.exists(db.name):
200         return tokens.DatabaseFile(db.name)
201
202     # Read CSV directly from the file object.
203     return tokens.Database(tokens.parse_csv(db))
204
205
206 def load_token_database(
207     *databases,
208     domain: Union[str,
209                   Pattern[str]] = tokens.DEFAULT_DOMAIN) -> tokens.Database:
210     """Loads a Database from database objects, ELFs, CSVs, or binary files."""
211     domain = re.compile(domain)
212     return tokens.Database.merged(*(_load_token_database(db, domain)
213                                     for db in databases))
214
215
216 def database_summary(db: tokens.Database) -> Dict[str, Any]:
217     """Returns a simple report of properties of the database."""
218     present = [entry for entry in db.entries() if not entry.date_removed]
219     collisions = {
220         token: list(e.string for e in entries)
221         for token, entries in db.collisions()
222     }
223
224     # Add 1 to each string's size to account for the null terminator.
225     return dict(
226         present_entries=len(present),
227         present_size_bytes=sum(len(entry.string) + 1 for entry in present),
228         total_entries=len(db.entries()),
229         total_size_bytes=sum(len(entry.string) + 1 for entry in db.entries()),
230         collisions=collisions,
231     )
232
233
234 _DatabaseReport = Dict[str, Dict[str, Dict[str, Any]]]
235
236
237 def generate_reports(paths: Iterable[Path]) -> _DatabaseReport:
238     """Returns a dictionary with information about the provided databases."""
239     reports: _DatabaseReport = {}
240
241     for path in paths:
242         with path.open('rb') as file:
243             if elf_reader.compatible_file(file):
244                 domains = list(tokenization_domains(file))
245             else:
246                 domains = ['']
247
248         domain_reports = {}
249
250         for domain in domains:
251             domain_reports[domain] = database_summary(
252                 load_token_database(path, domain=domain))
253
254         reports[str(path)] = domain_reports
255
256     return reports
257
258
259 def _handle_create(databases, database, force, output_type, include, exclude,
260                    replace):
261     """Creates a token database file from one or more ELF files."""
262
263     if database == '-':
264         # Must write bytes to stdout; use sys.stdout.buffer.
265         fd = sys.stdout.buffer
266     elif not force and os.path.exists(database):
267         raise FileExistsError(
268             f'The file {database} already exists! Use --force to overwrite.')
269     else:
270         fd = open(database, 'wb')
271
272     database = tokens.Database.merged(*databases)
273     database.filter(include, exclude, replace)
274
275     with fd:
276         if output_type == 'csv':
277             tokens.write_csv(database, fd)
278         elif output_type == 'binary':
279             tokens.write_binary(database, fd)
280         else:
281             raise ValueError(f'Unknown database type "{output_type}"')
282
283     _LOG.info('Wrote database with %d entries to %s as %s', len(database),
284               fd.name, output_type)
285
286
287 def _handle_add(token_database, databases):
288     initial = len(token_database)
289
290     for source in databases:
291         token_database.add(source.entries())
292
293     token_database.write_to_file()
294
295     _LOG.info('Added %d entries to %s',
296               len(token_database) - initial, token_database.path)
297
298
299 def _handle_mark_removals(token_database, databases, date):
300     marked_removed = token_database.mark_removals(
301         (entry for entry in tokens.Database.merged(*databases).entries()
302          if not entry.date_removed), date)
303
304     token_database.write_to_file()
305
306     _LOG.info('Marked %d of %d entries as removed in %s', len(marked_removed),
307               len(token_database), token_database.path)
308
309
310 def _handle_purge(token_database, before):
311     purged = token_database.purge(before)
312     token_database.write_to_file()
313
314     _LOG.info('Removed %d entries from %s', len(purged), token_database.path)
315
316
317 def _handle_report(token_database_or_elf: List[Path], output: TextIO) -> None:
318     json.dump(generate_reports(token_database_or_elf), output, indent=2)
319     output.write('\n')
320
321
322 def expand_paths_or_globs(*paths_or_globs: str) -> Iterable[Path]:
323     """Expands any globs in a list of paths; raises FileNotFoundError."""
324     for path_or_glob in paths_or_globs:
325         if os.path.exists(path_or_glob):
326             # This is a valid path; yield it without evaluating it as a glob.
327             yield Path(path_or_glob)
328         else:
329             paths = glob.glob(path_or_glob, recursive=True)
330
331             # If no paths were found and the path is not a glob, raise an Error.
332             if not paths and not any(c in path_or_glob for c in '*?[]!'):
333                 raise FileNotFoundError(f'{path_or_glob} is not a valid path')
334
335             for path in paths:
336                 # Resolve globs to CSV or compatible binary files.
337                 if elf_reader.compatible_file(path) or path.endswith('.csv'):
338                     yield Path(path)
339
340
341 class ExpandGlobs(argparse.Action):
342     """Argparse action that expands and appends paths."""
343     def __call__(self, parser, namespace, values, unused_option_string=None):
344         setattr(namespace, self.dest, list(expand_paths_or_globs(*values)))
345
346
347 def _read_elf_with_domain(elf: str,
348                           domain: Pattern[str]) -> Iterable[tokens.Database]:
349     for path in expand_paths_or_globs(elf):
350         with path.open('rb') as file:
351             if not elf_reader.compatible_file(file):
352                 raise ValueError(f'{elf} is not an ELF file, '
353                                  f'but the "{domain}" domain was specified')
354
355             yield _database_from_elf(file, domain)
356
357
358 class LoadTokenDatabases(argparse.Action):
359     """Argparse action that reads tokenize databases from paths or globs.
360
361     ELF files may have #domain appended to them to specify a tokenization domain
362     other than the default.
363     """
364     def __call__(self, parser, namespace, values, option_string=None):
365         databases: List[tokens.Database] = []
366         paths: Set[Path] = set()
367
368         try:
369             for value in values:
370                 if value.count('#') == 1:
371                     path, domain = value.split('#')
372                     domain = re.compile(domain)
373                     databases.extend(_read_elf_with_domain(path, domain))
374                 else:
375                     paths.update(expand_paths_or_globs(value))
376
377             for path in paths:
378                 databases.append(load_token_database(path))
379         except tokens.DatabaseFormatError as err:
380             parser.error(
381                 f'argument elf_or_token_database: {path} is not a supported '
382                 'token database file. Only ELF files or token databases (CSV '
383                 f'or binary format) are supported. {err}. ')
384         except FileNotFoundError as err:
385             parser.error(f'argument elf_or_token_database: {err}')
386         except:  # pylint: disable=bare-except
387             _LOG.exception('Failed to load token database %s', path)
388             parser.error('argument elf_or_token_database: '
389                          f'Error occurred while loading token database {path}')
390
391         setattr(namespace, self.dest, databases)
392
393
394 def token_databases_parser(nargs: str = '+') -> argparse.ArgumentParser:
395     """Returns an argument parser for reading token databases.
396
397     These arguments can be added to another parser using the parents arg.
398     """
399     parser = argparse.ArgumentParser(add_help=False)
400     parser.add_argument(
401         'databases',
402         metavar='elf_or_token_database',
403         nargs=nargs,
404         action=LoadTokenDatabases,
405         help=('ELF or token database files from which to read strings and '
406               'tokens. For ELF files, the tokenization domain to read from '
407               'may specified after the path as #domain_name (e.g. '
408               'foo.elf#TEST_DOMAIN). Unless specified, only the default '
409               'domain ("") is read from ELF files; .* reads all domains. '
410               'Globs are expanded to compatible database files.'))
411     return parser
412
413
414 def _parse_args():
415     """Parse and return command line arguments."""
416     def year_month_day(value) -> datetime:
417         if value == 'today':
418             return datetime.now()
419
420         return datetime.strptime(value, tokens.DATE_FORMAT)
421
422     year_month_day.__name__ = 'year-month-day (YYYY-MM-DD)'
423
424     # Shared command line options.
425     option_db = argparse.ArgumentParser(add_help=False)
426     option_db.add_argument('-d',
427                            '--database',
428                            dest='token_database',
429                            type=tokens.DatabaseFile,
430                            required=True,
431                            help='The database file to update.')
432
433     option_tokens = token_databases_parser('*')
434
435     # Top-level argument parser.
436     parser = argparse.ArgumentParser(
437         description=__doc__,
438         formatter_class=argparse.RawDescriptionHelpFormatter)
439     parser.set_defaults(handler=lambda **_: parser.print_help())
440
441     subparsers = parser.add_subparsers(
442         help='Tokenized string database management actions:')
443
444     # The 'create' command creates a database file.
445     subparser = subparsers.add_parser(
446         'create',
447         parents=[option_tokens],
448         help=
449         'Creates a database with tokenized strings from one or more sources.')
450     subparser.set_defaults(handler=_handle_create)
451     subparser.add_argument(
452         '-d',
453         '--database',
454         required=True,
455         help='Path to the database file to create; use - for stdout.')
456     subparser.add_argument(
457         '-t',
458         '--type',
459         dest='output_type',
460         choices=('csv', 'binary'),
461         default='csv',
462         help='Which type of database to create. (default: csv)')
463     subparser.add_argument('-f',
464                            '--force',
465                            action='store_true',
466                            help='Overwrite the database if it exists.')
467     subparser.add_argument(
468         '-i',
469         '--include',
470         type=re.compile,
471         default=[],
472         action='append',
473         help=('If provided, at least one of these regular expressions must '
474               'match for a string to be included in the database.'))
475     subparser.add_argument(
476         '-e',
477         '--exclude',
478         type=re.compile,
479         default=[],
480         action='append',
481         help=('If provided, none of these regular expressions may match for a '
482               'string to be included in the database.'))
483
484     unescaped_slash = re.compile(r'(?<!\\)/')
485
486     def replacement(value: str) -> Tuple[Pattern, 'str']:
487         try:
488             find, sub = unescaped_slash.split(value, 1)
489         except ValueError as err:
490             raise argparse.ArgumentTypeError(
491                 'replacements must be specified as "search_regex/replacement"')
492
493         try:
494             return re.compile(find.replace(r'\/', '/')), sub
495         except re.error as err:
496             raise argparse.ArgumentTypeError(
497                 f'"{value}" is not a valid regular expression: {err}')
498
499     subparser.add_argument(
500         '--replace',
501         type=replacement,
502         default=[],
503         action='append',
504         help=('If provided, replaces text that matches a regular expression. '
505               'This can be used to replace sensitive terms in a token '
506               'database that will be distributed publicly. The expression and '
507               'replacement are specified as "search_regex/replacement". '
508               'Plain slash characters in the regex must be escaped with a '
509               r'backslash (\/). The replacement text may include '
510               'backreferences for captured groups in the regex.'))
511
512     # The 'add' command adds strings to a database from a set of ELFs.
513     subparser = subparsers.add_parser(
514         'add',
515         parents=[option_db, option_tokens],
516         help=(
517             'Adds new strings to a database with tokenized strings from a set '
518             'of ELF files or other token databases. Missing entries are NOT '
519             'marked as removed.'))
520     subparser.set_defaults(handler=_handle_add)
521
522     # The 'mark_removals' command marks removed entries to match a set of ELFs.
523     subparser = subparsers.add_parser(
524         'mark_removals',
525         parents=[option_db, option_tokens],
526         help=(
527             'Updates a database with tokenized strings from a set of strings. '
528             'Strings not present in the set remain in the database but are '
529             'marked as removed. New strings are NOT added.'))
530     subparser.set_defaults(handler=_handle_mark_removals)
531     subparser.add_argument(
532         '--date',
533         type=year_month_day,
534         help=('The removal date to use for all strings. '
535               'May be YYYY-MM-DD or "today". (default: today)'))
536
537     # The 'purge' command removes old entries.
538     subparser = subparsers.add_parser(
539         'purge',
540         parents=[option_db],
541         help='Purges removed strings from a database.')
542     subparser.set_defaults(handler=_handle_purge)
543     subparser.add_argument(
544         '-b',
545         '--before',
546         type=year_month_day,
547         help=('Delete all entries removed on or before this date. '
548               'May be YYYY-MM-DD or "today".'))
549
550     # The 'report' command prints a report about a database.
551     subparser = subparsers.add_parser('report',
552                                       help='Prints a report about a database.')
553     subparser.set_defaults(handler=_handle_report)
554     subparser.add_argument(
555         'token_database_or_elf',
556         nargs='+',
557         action=ExpandGlobs,
558         help='The ELF files or token databases about which to generate reports.'
559     )
560     subparser.add_argument(
561         '-o',
562         '--output',
563         type=argparse.FileType('w'),
564         default=sys.stdout,
565         help='The file to which to write the output; use - for stdout.')
566
567     args = parser.parse_args()
568
569     handler = args.handler
570     del args.handler
571
572     return handler, args
573
574
575 def _init_logging(level: int) -> None:
576     _LOG.setLevel(logging.DEBUG)
577     log_to_stderr = logging.StreamHandler()
578     log_to_stderr.setLevel(level)
579     log_to_stderr.setFormatter(
580         logging.Formatter(
581             fmt='%(asctime)s.%(msecs)03d-%(levelname)s: %(message)s',
582             datefmt='%H:%M:%S'))
583
584     _LOG.addHandler(log_to_stderr)
585
586
587 def _main(handler: Callable, args: argparse.Namespace) -> int:
588     _init_logging(logging.INFO)
589     handler(**vars(args))
590     return 0
591
592
593 if __name__ == '__main__':
594     sys.exit(_main(*_parse_args()))