"""
return self._filter_single_value('filter_binary_path', binary_path)
+ def filter_module_desc(self, module_desc):
+ """
+ Consult available plugins to determine the module
+ description suitable for symbolication.
+
+ Returns `None` if symbolication should not be attempted for this module.
+ """
+ assert isinstance(module_desc, ModuleDesc)
+ return self._filter_single_value('filter_module_desc', module_desc)
+
class AsanSymbolizerPlugIn(object):
"""
This is the interface the `asan_symbolize.py` code uses to talk
"""
return binary_path
+ def filter_module_desc(self, module_desc):
+ """
+ Given a ModuleDesc object (`module_desc`) return
+ a ModuleDesc suitable for symbolication.
+
+ Implementations should return `None` if symbolication of this binary
+ should be skipped.
+ """
+ return module_desc
+
+class ModuleDesc(object):
+ def __init__(self, name, arch, start_addr, end_addr, module_path, uuid):
+ self.name = name
+ self.arch = arch
+ self.start_addr = start_addr
+ self.end_addr = end_addr
+ # Module path from an ASan report.
+ self.module_path = module_path
+ # Module for performing symbolization, by default same as above.
+ self.module_path_for_symbolization = module_path
+ self.uuid = uuid
+ assert self.is_valid()
+
+ def __str__(self):
+ assert self.is_valid()
+ return "{name} {arch} {start_addr:#016x}-{end_addr:#016x} {module_path} {uuid}".format(
+ name=self.name,
+ arch=self.arch,
+ start_addr=self.start_addr,
+ end_addr=self.end_addr,
+ module_path=self.module_path if self.module_path == self.module_path_for_symbolization else '{} ({})'.format(self.module_path_for_symbolization, self.module_path),
+ uuid=self.uuid
+ )
+
+ def is_valid(self):
+ if not isinstance(self.name, str):
+ return False
+ if not isinstance(self.arch, str):
+ return False
+ if not isinstance(self.start_addr, int):
+ return False
+ if self.start_addr < 0:
+ return False
+ if not isinstance(self.end_addr, int):
+ return False
+ if self.end_addr <= self.start_addr:
+ return False
+ if not isinstance(self.module_path, str):
+ return False
+ if not os.path.isabs(self.module_path):
+ return False
+ if not isinstance(self.module_path_for_symbolization, str):
+ return False
+ if not os.path.isabs(self.module_path_for_symbolization):
+ return False
+ if not isinstance(self.uuid, str):
+ return False
+ return True
+
+class GetUUIDFromBinaryException(Exception):
+ def __init__(self, msg):
+ super(GetUUIDFromBinaryException, self).__init__(msg)
+
+_get_uuid_from_binary_cache = dict()
+
+def get_uuid_from_binary(path_to_binary, arch=None):
+ cache_key = (path_to_binary, arch)
+ cached_value = _get_uuid_from_binary_cache.get(cache_key)
+ if cached_value:
+ return cached_value
+ if not os.path.exists(path_to_binary):
+ raise GetUUIDFromBinaryException('Binary "{}" does not exist'.format(path_to_binary))
+ cmd = [ '/usr/bin/otool', '-l']
+ if arch:
+ cmd.extend(['-arch', arch])
+ cmd.append(path_to_binary)
+ output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
+ # Look for this output:
+ # cmd LC_UUID
+ # cmdsize 24
+ # uuid 4CA778FE-5BF9-3C45-AE59-7DF01B2BE83F
+ if isinstance(output, str):
+ output_str = output
+ else:
+ assert isinstance(output, bytes)
+ output_str = output.decode()
+ assert isinstance(output_str, str)
+ lines = output_str.split('\n')
+ uuid = None
+ for index, line in enumerate(lines):
+ stripped_line = line.strip()
+ if not stripped_line.startswith('cmd LC_UUID'):
+ continue
+ uuid_line = lines[index+2].strip()
+ if not uuid_line.startswith('uuid'):
+ raise GetUUIDFromBinaryException('Malformed output: "{}"'.format(uuid_line))
+ split_uuid_line = uuid_line.split()
+ uuid = split_uuid_line[1]
+ break
+ if uuid is None:
+ raise GetUUIDFromBinaryException('Failed to retrieve UUID')
+ else:
+ # Update cache
+ _get_uuid_from_binary_cache[cache_key] = uuid
+ return uuid
+
+class ModuleMap(object):
+ def __init__(self):
+ self._module_name_to_description_map = dict()
+
+ def add_module(self, desc):
+ assert isinstance(desc, ModuleDesc)
+ assert desc.name not in self._module_name_to_description_map
+ self._module_name_to_description_map[desc.name] = desc
+
+ def find_module_by_name(self, name):
+ return self._module_name_to_description_map.get(name, None)
+
+ def __str__(self):
+ s = '{} modules:\n'.format(self.num_modules)
+ for module_desc in sorted(self._module_name_to_description_map.values(), key=lambda v: v.start_addr):
+ s += str(module_desc) + '\n'
+ return s
+
+ @property
+ def num_modules(self):
+ return len(self._module_name_to_description_map)
+
+ @property
+ def modules(self):
+ return set(self._module_name_to_description_map.values())
+
+ def get_module_path_for_symbolication(self, module_name, proxy):
+ module_desc = self.find_module_by_name(module_name)
+ if module_desc is None:
+ return None
+ # Allow a plug-in to change the module description to make it
+ # suitable for symbolication or avoid symbolication altogether.
+ module_desc = proxy.filter_module_desc(module_desc)
+ if module_desc is None:
+ return None
+ try:
+ uuid = get_uuid_from_binary(module_desc.module_path_for_symbolization, arch = module_desc.arch)
+ if uuid != module_desc.uuid:
+ logging.warning("Detected UUID mismatch {} != {}".format(uuid, module_desc.uuid))
+ # UUIDs don't match. Tell client to not symbolize this.
+ return None
+ except GetUUIDFromBinaryException as e:
+ logging.error('Failed to binary from UUID: %s', str(e))
+ return None
+ return module_desc.module_path_for_symbolization
+
+ @staticmethod
+ def parse_from_file(module_map_path):
+ if not os.path.exists(module_map_path):
+ raise Exception('module map "{}" does not exist'.format(module_map_path))
+ with open(module_map_path, 'r') as f:
+ mm = None
+ # E.g.
+ # 0x2db4000-0x102ddc000 /path/to (arm64) <0D6BBDE0-FF90-3680-899D-8E6F9528E04C>
+ hex_regex = lambda name: r'0x(?P<' + name + r'>[0-9a-f]+)'
+ module_path_regex = r'(?P<path>.+)'
+ arch_regex = r'\((?P<arch>.+)\)'
+ uuid_regex = r'<(?P<uuid>[0-9A-Z-]+)>'
+ line_regex = r'^{}-{}\s+{}\s+{}\s+{}'.format(
+ hex_regex('start_addr'),
+ hex_regex('end_addr'),
+ module_path_regex,
+ arch_regex,
+ uuid_regex
+ )
+ matcher = re.compile(line_regex)
+ line_num = 0
+ line = 'dummy'
+ while line != '':
+ line = f.readline()
+ line_num += 1
+ if mm is None:
+ if line.startswith('Process module map:'):
+ mm = ModuleMap()
+ continue
+ if line.startswith('End of module map'):
+ break
+ m_obj = matcher.match(line)
+ if not m_obj:
+ raise Exception('Failed to parse line {} "{}"'.format(line_num, line))
+ arch = m_obj.group('arch')
+ start_addr = int(m_obj.group('start_addr'), base=16)
+ end_addr = int(m_obj.group('end_addr'), base=16)
+ module_path = m_obj.group('path')
+ uuid = m_obj.group('uuid')
+ module_desc = ModuleDesc(
+ name=os.path.basename(module_path),
+ arch=arch,
+ start_addr=start_addr,
+ end_addr=end_addr,
+ module_path=module_path,
+ uuid=uuid
+ )
+ mm.add_module(module_desc)
+ if mm is not None:
+ logging.debug('Loaded Module map from "{}":\n{}'.format(
+ f.name,
+ str(mm))
+ )
+ return mm
+
class SysRootFilterPlugIn(AsanSymbolizerPlugIn):
"""
Simple plug-in to add sys root prefix to all binary paths
def filter_binary_path(self, path):
return self.sysroot_path + path
+class ModuleMapPlugIn(AsanSymbolizerPlugIn):
+ def __init__(self):
+ self._module_map = None
+ def register_cmdline_args(self, parser):
+ parser.add_argument('--module-map',
+ help='Path to text file containing module map'
+ 'output. See print_module_map ASan option.')
+ def process_cmdline_args(self, pargs):
+ if not pargs.module_map:
+ return False
+ self._module_map = ModuleMap.parse_from_file(args.module_map)
+ if self._module_map is None:
+ msg = 'Failed to find module map'
+ logging.error(msg)
+ raise Exception(msg)
+ return True
+ def filter_binary_path(self, binary_path):
+ if os.path.isabs(binary_path):
+ # This is a binary path so transform into
+ # a module name
+ module_name = os.path.basename(binary_path)
+ else:
+ module_name = binary_path
+ return self._module_map.get_module_path_for_symbolication(module_name, self.proxy)
+
def add_logging_args(parser):
parser.add_argument('--log-dest',
default=None,
for plugin_path in pargs.plugins:
plugin_proxy.load_plugin_from_file(plugin_path)
# Add built-in plugins.
+ plugin_proxy.add_plugin(ModuleMapPlugIn())
plugin_proxy.add_plugin(SysRootFilterPlugIn())
return unparsed_args