From 8108b72da8a17621669267f5dfe2c3277fd30f12 Mon Sep 17 00:00:00 2001 From: DongHun Kwak Date: Mon, 12 Apr 2021 15:19:55 +0900 Subject: [PATCH 1/1] Imported Upstream version python3-rospkg 1.2.8 --- PKG-INFO | 14 + README.md | 6 + setup.cfg | 4 + setup.py | 55 +++ src/rospkg.egg-info/PKG-INFO | 14 + src/rospkg.egg-info/SOURCES.txt | 26 + src/rospkg.egg-info/dependency_links.txt | 1 + src/rospkg.egg-info/entry_points.txt | 3 + src/rospkg.egg-info/requires.txt | 3 + src/rospkg.egg-info/top_level.txt | 1 + src/rospkg/__init__.py | 58 +++ src/rospkg/common.py | 57 +++ src/rospkg/distro.py | 655 ++++++++++++++++++++++++++ src/rospkg/environment.py | 227 +++++++++ src/rospkg/manifest.py | 516 ++++++++++++++++++++ src/rospkg/os_detect.py | 783 +++++++++++++++++++++++++++++++ src/rospkg/rospack.py | 531 +++++++++++++++++++++ src/rospkg/rosversion.py | 124 +++++ src/rospkg/stack.py | 258 ++++++++++ test/test_rospkg_catkin_packages.py | 69 +++ test/test_rospkg_common.py | 52 ++ test/test_rospkg_distro.py | 419 +++++++++++++++++ test/test_rospkg_distro_vcs_config.py | 383 +++++++++++++++ test/test_rospkg_environment.py | 168 +++++++ test/test_rospkg_manifest.py | 272 +++++++++++ test/test_rospkg_os_detect.py | 700 +++++++++++++++++++++++++++ test/test_rospkg_packages.py | 358 ++++++++++++++ test/test_rospkg_stacks.py | 292 ++++++++++++ 28 files changed, 6049 insertions(+) create mode 100644 PKG-INFO create mode 100644 README.md create mode 100644 setup.cfg create mode 100755 setup.py create mode 100644 src/rospkg.egg-info/PKG-INFO create mode 100644 src/rospkg.egg-info/SOURCES.txt create mode 100644 src/rospkg.egg-info/dependency_links.txt create mode 100644 src/rospkg.egg-info/entry_points.txt create mode 100644 src/rospkg.egg-info/requires.txt create mode 100644 src/rospkg.egg-info/top_level.txt create mode 100644 src/rospkg/__init__.py create mode 100644 src/rospkg/common.py create mode 100644 src/rospkg/distro.py create mode 100644 src/rospkg/environment.py create mode 100644 src/rospkg/manifest.py create mode 100644 src/rospkg/os_detect.py create mode 100644 src/rospkg/rospack.py create mode 100644 src/rospkg/rosversion.py create mode 100644 src/rospkg/stack.py create mode 100644 test/test_rospkg_catkin_packages.py create mode 100644 test/test_rospkg_common.py create mode 100644 test/test_rospkg_distro.py create mode 100644 test/test_rospkg_distro_vcs_config.py create mode 100644 test/test_rospkg_environment.py create mode 100644 test/test_rospkg_manifest.py create mode 100644 test/test_rospkg_os_detect.py create mode 100644 test/test_rospkg_packages.py create mode 100644 test/test_rospkg_stacks.py diff --git a/PKG-INFO b/PKG-INFO new file mode 100644 index 0000000..8476440 --- /dev/null +++ b/PKG-INFO @@ -0,0 +1,14 @@ +Metadata-Version: 1.1 +Name: rospkg +Version: 1.2.8 +Summary: ROS package library +Home-page: http://wiki.ros.org/rospkg +Author: Ken Conley +Author-email: kwc@willowgarage.com +License: BSD +Description: Library for retrieving information about ROS packages and stacks. + +Keywords: ROS +Platform: UNKNOWN +Classifier: Programming Language :: Python +Classifier: License :: OSI Approved :: BSD License diff --git a/README.md b/README.md new file mode 100644 index 0000000..6433e2d --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +rospkg +----- + +Standalone Python library for the ROS package system. + +[ROS Packages Users/Developers Guide](http://docs.ros.org/independent/api/rospkg/html/) diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..8bfd5a1 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,4 @@ +[egg_info] +tag_build = +tag_date = 0 + diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..d64959a --- /dev/null +++ b/setup.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +import os +import sys + +from setuptools import setup + +install_requires = ['catkin_pkg', 'PyYAML'] + +if ( + 'SKIP_PYTHON_MODULES' not in os.environ and + 'SKIP_PYTHON_SCRIPTS' not in os.environ +): + install_requires.append('distro') + +kwargs = { + 'name': 'rospkg', + # same version as in: + # - src/rospkg/__init__.py + # - stdeb.cfg + 'version': '1.2.8', + 'packages': ['rospkg'], + 'package_dir': {'': 'src'}, + 'entry_points': { + 'console_scripts': ['rosversion=rospkg.rosversion:main'], + }, + 'install_requires': install_requires, + 'author': 'Ken Conley', + 'author_email': 'kwc@willowgarage.com', + 'url': 'http://wiki.ros.org/rospkg', + 'keywords': ['ROS'], + 'classifiers': [ + 'Programming Language :: Python', + 'License :: OSI Approved :: BSD License'], + 'description': 'ROS package library', + 'long_description': """\ + Library for retrieving information about ROS packages and stacks. + """, + 'license': 'BSD' +} + +if sys.version_info[0] == 2 and sys.version_info[1] < 7: + kwargs['install_requires'].append('argparse') + +if 'SKIP_PYTHON_MODULES' in os.environ: + kwargs['packages'] = [] + kwargs['package_dir'] = {} + kwargs['install_requires'].remove('catkin_pkg') +if 'SKIP_PYTHON_SCRIPTS' in os.environ: + kwargs['name'] += '_modules' + kwargs['install_requires'].remove('catkin_pkg') + kwargs['scripts'] = [] + kwargs['entry_points']['console_scripts'] = [] + +setup(**kwargs) diff --git a/src/rospkg.egg-info/PKG-INFO b/src/rospkg.egg-info/PKG-INFO new file mode 100644 index 0000000..8476440 --- /dev/null +++ b/src/rospkg.egg-info/PKG-INFO @@ -0,0 +1,14 @@ +Metadata-Version: 1.1 +Name: rospkg +Version: 1.2.8 +Summary: ROS package library +Home-page: http://wiki.ros.org/rospkg +Author: Ken Conley +Author-email: kwc@willowgarage.com +License: BSD +Description: Library for retrieving information about ROS packages and stacks. + +Keywords: ROS +Platform: UNKNOWN +Classifier: Programming Language :: Python +Classifier: License :: OSI Approved :: BSD License diff --git a/src/rospkg.egg-info/SOURCES.txt b/src/rospkg.egg-info/SOURCES.txt new file mode 100644 index 0000000..76272e5 --- /dev/null +++ b/src/rospkg.egg-info/SOURCES.txt @@ -0,0 +1,26 @@ +README.md +setup.py +src/rospkg/__init__.py +src/rospkg/common.py +src/rospkg/distro.py +src/rospkg/environment.py +src/rospkg/manifest.py +src/rospkg/os_detect.py +src/rospkg/rospack.py +src/rospkg/rosversion.py +src/rospkg/stack.py +src/rospkg.egg-info/PKG-INFO +src/rospkg.egg-info/SOURCES.txt +src/rospkg.egg-info/dependency_links.txt +src/rospkg.egg-info/entry_points.txt +src/rospkg.egg-info/requires.txt +src/rospkg.egg-info/top_level.txt +test/test_rospkg_catkin_packages.py +test/test_rospkg_common.py +test/test_rospkg_distro.py +test/test_rospkg_distro_vcs_config.py +test/test_rospkg_environment.py +test/test_rospkg_manifest.py +test/test_rospkg_os_detect.py +test/test_rospkg_packages.py +test/test_rospkg_stacks.py \ No newline at end of file diff --git a/src/rospkg.egg-info/dependency_links.txt b/src/rospkg.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/rospkg.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/src/rospkg.egg-info/entry_points.txt b/src/rospkg.egg-info/entry_points.txt new file mode 100644 index 0000000..a37dd15 --- /dev/null +++ b/src/rospkg.egg-info/entry_points.txt @@ -0,0 +1,3 @@ +[console_scripts] +rosversion = rospkg.rosversion:main + diff --git a/src/rospkg.egg-info/requires.txt b/src/rospkg.egg-info/requires.txt new file mode 100644 index 0000000..2177d60 --- /dev/null +++ b/src/rospkg.egg-info/requires.txt @@ -0,0 +1,3 @@ +catkin_pkg +PyYAML +distro diff --git a/src/rospkg.egg-info/top_level.txt b/src/rospkg.egg-info/top_level.txt new file mode 100644 index 0000000..eeae947 --- /dev/null +++ b/src/rospkg.egg-info/top_level.txt @@ -0,0 +1 @@ +rospkg diff --git a/src/rospkg/__init__.py b/src/rospkg/__init__.py new file mode 100644 index 0000000..39f08ee --- /dev/null +++ b/src/rospkg/__init__.py @@ -0,0 +1,58 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +""" +Base ROS python library for manipulating ROS packages and stacks. +""" + +from .common import MANIFEST_FILE, ResourceNotFound, STACK_FILE +from .environment import get_etc_ros_dir, get_log_dir, get_ros_home, \ + get_ros_package_path, get_ros_paths, get_ros_root, \ + get_test_results_dir, on_ros_path +from .manifest import InvalidManifest, Manifest, parse_manifest_file +from .rospack import expand_to_packages, get_package_name, \ + get_stack_version_by_dir, list_by_path, RosPack, RosStack + +# same version as in: +# - setup.py +# - stdeb.cfg +__version__ = '1.2.8' + +__all__ = ( + 'MANIFEST_FILE', 'ResourceNotFound', 'STACK_FILE', + 'get_etc_ros_dir', 'get_log_dir', 'get_ros_home', + 'get_ros_package_path', 'get_ros_paths', 'get_ros_root', + 'get_test_results_dir', 'on_ros_path', + 'InvalidManifest', 'Manifest', 'parse_manifest_file', + 'get_package_name', 'RosPack', 'RosStack', + 'list_by_path', 'expand_to_packages', 'get_stack_version_by_dir', +) diff --git a/src/rospkg/common.py b/src/rospkg/common.py new file mode 100644 index 0000000..9adb2f0 --- /dev/null +++ b/src/rospkg/common.py @@ -0,0 +1,57 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +""" +Common definitions for rospkg modules. +""" + +MANIFEST_FILE = 'manifest.xml' +PACKAGE_FILE = 'package.xml' +STACK_FILE = 'stack.xml' +ROS_STACK = 'ros' + + +class ResourceNotFound(Exception): + """ + A ROS filesystem resource was not found. + """ + + def __init__(self, msg, ros_paths=None): + super(ResourceNotFound, self).__init__(msg) + self.ros_paths = ros_paths + + def __str__(self): + s = self.args[0] # python 2.6 + if self.ros_paths: + for i, p in enumerate(self.ros_paths): + s = s + '\nROS path [%s]=%s' % (i, p) + return s diff --git a/src/rospkg/distro.py b/src/rospkg/distro.py new file mode 100644 index 0000000..fa2d166 --- /dev/null +++ b/src/rospkg/distro.py @@ -0,0 +1,655 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2010, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +""" +Representation/model of rosdistro format. +""" + +import os +import re +import string +try: + from urllib.request import urlopen +except ImportError: + from urllib2 import urlopen +import yaml + +from .common import ResourceNotFound +from .environment import get_etc_ros_dir + +TARBALL_URI_EVAL = 'http://svn.code.sf.net/p/ros-dry-releases/code/download/stacks/$STACK_NAME/$STACK_NAME-$STACK_VERSION/$STACK_NAME-$STACK_VERSION.tar.bz2' +TARBALL_VERSION_EVAL = '$STACK_NAME-$STACK_VERSION' + + +class InvalidDistro(Exception): + """ + Distro file data does not match specification. + """ + pass + + +def distro_uri(distro_name): + """ + Get distro URI of main ROS distribution files. + + :param distro_name: name of distro, e.g. 'diamondback' + :returns: the SVN/HTTP URL of the specified distro. This function should only be used + with the main distros. + """ + return "http://svn.code.sf.net/p/ros-dry-releases/code/trunk/distros/%s.rosdistro" % (distro_name) + +def expand_rule(rule, stack_name, stack_ver, release_name): + s = rule.replace('$STACK_NAME', stack_name) + if stack_ver: + s = s.replace('$STACK_VERSION', stack_ver) + s = s.replace('$RELEASE_NAME', release_name) + return s + + +class DistroStack(object): + """Stores information about a stack release""" + + def __init__(self, stack_name, stack_version, release_name, rules): + """ + :param stack_name: Name of stack + :param stack_version: Version number of stack. + :param release_name: name of distribution release. Necessary for rule expansion. + :param rules: raw '_rules' data. Will be converted into appropriate vcs config instance. + """ + self.name = stack_name + self.version = stack_version + self.release_name = release_name + self._rules = rules + self.repo = rules.get('repo', None) + self.vcs_config = load_vcs_config(self._rules, self._expand_rule) + + def _expand_rule(self, rule): + """ + Perform variable substitution on stack rule. + """ + return expand_rule(rule, self.name, self.version, self.release_name) + + def __eq__(self, other): + try: + return self.name == other.name and \ + self.version == other.version and \ + self.vcs_config == other.vcs_config + except AttributeError: + return False + + +class Variant(object): + """ + A variant defines a specific set of stacks ("metapackage", in Debian + parlance). For example, "base", "pr2". These variants can extend + another variant. + """ + + def __init__(self, variant_name, extends, stack_names, stack_names_implicit): + """ + :param variant_name: name of variant to load from distro file, ``str`` + :param stack_names_implicit: full list of stacks implicitly included in this variant, ``[str]`` + :param raw_data: raw rosdistro data for this variant + """ + self.name = variant_name + self.extends = extends + self._stack_names = stack_names + self._stack_names_implicit = stack_names_implicit + + def get_stack_names(self, implicit=True): + if implicit: + return self._stack_names_implicit + else: + return self._stack_names + + # stack_names includes implicit stack names. Use get_stack_names() + # to get explicit only + stack_names = property(get_stack_names) + + +class Distro(object): + """ + Store information in a rosdistro file. + """ + + def __init__(self, stacks, variants, release_name, version, raw_data): + """ + :param stacks: dictionary mapping stack names to :class:`DistroStack` instances + :param variants: dictionary mapping variant names to :class:`Variant` instances + :param release_name: name of release, e.g. 'diamondback' + :param version: version number of release + :param raw_data: raw dictionary representation of a distro + """ + self._stacks = stacks + self.variants = variants + self.release_name = release_name + self.version = version + self.raw_data = raw_data + + def get_stacks(self, released=False): + """ + :param released: only included released stacks + :returns: dictionary of stack names to :class:`DistroStack` instances in + this distro. + """ + if released: + return self._get_released_stacks() + else: + return self._stacks.copy() + + def _get_released_stacks(self): + retval = {} + for s, obj in self._stacks.items(): + if obj.version: + retval[s] = obj + return retval + + # gets map of all stacks + stacks = property(get_stacks) + # gets maps of released stacks + released_stacks = property(_get_released_stacks) + + +def load_distro(source_uri): + """ + :param source_uri: source URI of distro file, or path to distro + file. Filename has precedence in resolution. + + :raises: :exc:`InvalidDistro` If distro file is invalid + :raises: :exc:`ResourceNotFound` If file at *source_uri* is not found + """ + try: + # parse rosdistro yaml + if os.path.isfile(source_uri): + # load rosdistro file + with open(source_uri) as f: + raw_data = yaml.load(f.read()) + else: + try: + request = urlopen(source_uri) + except Exception as e: + raise ResourceNotFound('%s (%s)' % (str(e), source_uri)) + try: + raw_data = yaml.load(request) + except ValueError: + raise ResourceNotFound(source_uri) + if not type(raw_data) == dict: + raise InvalidDistro("Distro must be a dictionary: %s" % (source_uri)) + except yaml.YAMLError as e: + raise InvalidDistro(str(e)) + + try: + version = _distro_version(raw_data.get('version', '0')) + release_name = raw_data['release'] + stacks = _load_distro_stacks(raw_data, release_name) + variants = _load_variants(raw_data.get('variants', {}), stacks) + return Distro(stacks, variants, release_name, version, raw_data) + except KeyError as e: + raise InvalidDistro("distro is missing required '%s' key" % (str(e))) + + +def _load_variants(raw_data, stacks): + if not raw_data: + return {} + all_variants_raw_data = {} + for v in raw_data: + if type(v) != dict or len(v.keys()) != 1: + raise InvalidDistro("invalid variant spec: %s" % v) + variant_name = list(v.keys())[0] + all_variants_raw_data[variant_name] = v[variant_name] + variants = {} + for variant_name in all_variants_raw_data.keys(): + variants[variant_name] = _load_variant(variant_name, all_variants_raw_data) + + # Disabling validation to support variants which include wet packages. + # validate + # for stack_name in variants[variant_name].get_stack_names(implicit=False): + # if stack_name not in stacks: + # raise InvalidDistro("variant [%s] refers to non-existent stack [%s]"%(variant_name, stack_name)) + return variants + + +def _load_variant(variant_name, all_variants_raw_data): + variant_raw_data = all_variants_raw_data[variant_name] + stack_names_implicit = list(variant_raw_data.get('stacks', [])) + extends = variant_raw_data.get('extends', []) + if isinstance(extends, str): + extends = [extends] + for e in extends: + parent_variant = _load_variant(e, all_variants_raw_data) + stack_names_implicit = parent_variant.get_stack_names(implicit=True) + stack_names_implicit + return Variant(variant_name, extends, variant_raw_data.get('stacks', []), stack_names_implicit) + + +def _load_distro_stacks(distro_doc, release_name): + """ + :param distro_doc: dictionary form of rosdistro file, `dict` + :returns: dictionary of stack names to :class:`DistroStack` instances, `{str : DistroStack}` + :raises: :exc:`InvalidDistro` if distro_doc format is invalid + """ + + # load stacks and expand out uri rules + stacks = {} + try: + stack_props = distro_doc['stacks'] + stack_props = stack_props or {} + stack_names = [x for x in stack_props.keys() if not x[0] == '_'] + except KeyError: + raise InvalidDistro("distro is missing required 'stacks' key") + for stack_name in stack_names: + stack_version = stack_props[stack_name].get('version', None) + rules = _get_rules(distro_doc, stack_name) + if not rules: + raise InvalidDistro("no VCS rules for stack [%s]" % (stack_name)) + stacks[stack_name] = DistroStack(stack_name, stack_version, release_name, rules) + return stacks + + +def _distro_version(version_val): + """ + Parse distro version value, converting SVN revision to version value if necessary + """ + version_val = str(version_val) + # check for no keyword sub + if version_val == '$Revision$': + return 0 + m = re.search(r'\$Revision:\s*([0-9]*)\s*\$', version_val) + if m is not None: + version_val = 'r' + m.group(1) + + # Check that is a valid version string + valid = string.ascii_letters + string.digits + '.+~' + if False in (c in valid for c in version_val): + raise InvalidDistro("Version string %s not valid" % version_val) + return version_val + + +def distro_to_rosinstall(distro, branch, variant_name=None, implicit=True, released_only=True, anonymous=True): + """ + :param branch: branch to convert for + :param variant_name: if not None, only include stacks in the specified variant. + :param implicit: if variant_name is provided, include full (recursive) dependencies of variant, default True + :param released_only: only included released stacks, default True. + :param anonymous: create for anonymous access rules + :returns: rosinstall data in Python list format, ``[dict]`` + + :raises: :exc:`KeyError` If branch is invalid or if distro is mis-configured + """ + variant = distro.variants.get(variant_name, None) + if variant_name: + stack_names = set(variant.get_stack_names(implicit=implicit)) + else: + stack_names = distro.released_stacks.keys() + rosinstall_data = [] + for s in stack_names: + if released_only and s not in distro.released_stacks: + continue + rosinstall_data.extend(distro.stacks[s].vcs_config.to_rosinstall(s, branch, anonymous)) + return rosinstall_data + +################################################################################ + + +def _get_rules(distro_doc, stack_name): + """ + Retrieve rules from distro_doc for specified stack. This operates on + the raw distro dictionary document. + + :param distro_doc: rosdistro document, ``dict`` + :param stack_name: name of stack to get rules for, ``str`` + """ + # top-level named section + named_rules_d = distro_doc.get('_rules', {}) + + # other rules to search + rules_d = [distro_doc.get('stacks', {}), + distro_doc.get('stacks', {}).get(stack_name, {})] + rules_d = [d for d in rules_d if '_rules' in d] + + # last rules wins + if not rules_d: + return None + rules_d = rules_d[-1] + + update_r = rules_d.get('_rules', {}) + if type(update_r) == str: + try: + update_r = named_rules_d[update_r] + except KeyError: + raise InvalidDistro("no _rules named [%s]" % (update_r)) + if not type(update_r) == dict: + raise InvalidDistro("invalid rules: %s %s" % (update_r, type(update_r))) + return update_r + +################################################################################ + + +class VcsConfig(object): + """ + Base representation of a rosdistro VCS rules configuration. + """ + + def __init__(self, type_): + self.type = type_ + self.tarball_url = self.tarball_version = None + + def to_rosinstall(self, local_name, branch, anonymous): + uri, version_tag = self.get_branch(branch, anonymous) + if branch == 'release-tar': + type_ = 'tar' + else: + type_ = self.type + if version_tag: + return [{type_: {"uri": uri, 'local-name': local_name, 'version': version_tag}}] + else: + return [({type_: {"uri": uri, 'local-name': local_name}})] + + def load(self, rules, rule_eval): + """ + Initialize fields of this class based on the raw rosdistro + *rules* data after applying *rule_eval* function (e.g. to + replace variables in rules). + + :param rules: raw rosdistro rules entry, ``dict`` + :param rule_eval: function to evaluate rule values, ``fn(str) -> str`` + """ + self.tarball_url = rule_eval(TARBALL_URI_EVAL) + self.tarball_version = rule_eval(TARBALL_VERSION_EVAL) + + def get_branch(self, branch, anonymous): + """ + :raises: :exc:`ValueError` If branch is invalid + """ + if branch == 'release-tar': + return self.tarball_url, self.tarball_version + else: + raise ValueError(branch) + + def __eq__(self, other): + return self.type == other.type and \ + self.tarball_url == other.tarball_url + + +class DvcsConfig(VcsConfig): + """ + Configuration information for a distributed VCS-style repository. + + Configuration fields: + + * ``repo_uri``: base URI of repo + * ``dev_branch``: git branch the code is developed + * ``distro_tag``: a tag of the latest released code for a specific ROS distribution + * ``release_tag``: a tag of the code for a specific release + """ + + def __init__(self, type_): + super(DvcsConfig, self).__init__(type_) + self.repo_uri = self.anon_repo_uri = None + self.dev_branch = self.distro_tag = self.release_tag = None + + def load(self, rules, rule_eval): + super(DvcsConfig, self).load(rules, rule_eval) + + self.repo_uri = rule_eval(rules['uri']) + if 'anon-uri' in rules: + self.anon_repo_uri = rule_eval(rules['anon-uri']) + else: + self.anon_repo_uri = self.repo_uri + self.dev_branch = rule_eval(rules['dev-branch']) + self.distro_tag = rule_eval(rules['distro-tag']) + self.release_tag = rule_eval(rules['release-tag']) + + def get_branch(self, branch, anonymous): + """ + :raises: :exc:`KeyError` Invalid branch parameter + """ + if branch == 'release-tar': + return super(DvcsConfig, self).get_branch(branch, anonymous) + elif branch == 'devel': + version_tag = self.dev_branch + elif branch == 'distro': + version_tag = self.distro_tag + elif branch == 'release': + version_tag = self.release_tag + else: + raise ValueError("invalid branch spec [%s]" % (branch)) + # occurs, for example, with unreleased stacks. Only devel is valid + if version_tag is None: + raise ValueError("branch [%s] is not available for this config" % (branch)) + if anonymous: + return self.anon_repo_uri, version_tag + else: + return self.repo_uri, version_tag + + def __eq__(self, other): + return super(DvcsConfig, self).__eq__(other) and \ + self.repo_uri == other.repo_uri and \ + self.anon_repo_uri == other.anon_repo_uri and \ + self.dev_branch == other.dev_branch and \ + self.release_tag == other.release_tag and \ + self.distro_tag == other.distro_tag + + +class GitConfig(DvcsConfig): + """ + Configuration information about an GIT repository. See parent class :class:`DvcsConfig` for more API information. + """ + + def __init__(self): + super(GitConfig, self).__init__('git') + + +class HgConfig(DvcsConfig): + """ + Configuration information about a Mercurial repository. See parent class :class:`DvcsConfig` for more API information. + """ + + def __init__(self): + super(HgConfig, self).__init__('hg') + + +class BzrConfig(DvcsConfig): + """ + Configuration information about an BZR repository. See parent class :class:`DvcsConfig` for more API information. + """ + + def __init__(self): + super(BzrConfig, self).__init__('bzr') + + +class SvnConfig(VcsConfig): + """ + Configuration information about an SVN repository. + + Configuration fields: + + * ``dev``: where the code is developed + * ``distro_tag``: a tag of the code for a specific ROS distribution + * ``release_tag``: a tag of the code for a specific release + """ + + def __init__(self): + super(SvnConfig, self).__init__('svn') + self.dev = None + self.distro_tag = None + self.release_tag = None + + # anonymously readable version of URLs above. Some repos have + # separate URLs for read-only vs. writable versions of repo + # and many tools need to be able to read repos without + # providing credentials. + self.anon_dev = None + self.anon_distro_tag = None + self.anon_release_tag = None + + def load(self, rules, rule_eval): + super(SvnConfig, self).load(rules, rule_eval) + for k in ['dev', 'distro-tag', 'release-tag']: + if k not in rules: + raise KeyError("svn rules missing required %s key: %s" % (k, rules)) + self.dev = rule_eval(rules['dev']) + self.distro_tag = rule_eval(rules['distro-tag']) + self.release_tag = rule_eval(rules['release-tag']) + + # specify urls that are safe to anonymously read + # from. Users must supply a complete set. + if 'anon-dev' in rules: + self.anon_dev = rule_eval(rules['anon-dev']) + self.anon_distro_tag = rule_eval(rules['anon-distro-tag']) + self.anon_release_tag = rule_eval(rules['anon-release-tag']) + else: + # if no login credentials, assume that anonymous is + # same as normal keys. + self.anon_dev = self.dev + self.anon_distro_tag = self.distro_tag + self.anon_release_tag = self.release_tag + + def get_branch(self, branch, anonymous): + """ + :raises: :exc:`ValueError` If branch is invalid + """ + if branch == 'release-tar': + return super(SvnConfig, self).get_branch(branch, anonymous) + else: + key_map = dict(devel='dev', distro='distro_tag', release='release_tag') + if branch not in key_map: + raise KeyError("invalid branch spec [%s]" % (branch)) + attr_name = key_map[branch] + if anonymous: + attr_name = 'anon_' + attr_name + uri = getattr(self, attr_name) + # occurs, for example, with unreleased stacks. Only devel is valid + if uri is None: + raise ValueError("branch [%s] is not available for this config" % (branch)) + return uri, None + + def __eq__(self, other): + return super(SvnConfig, self).__eq__(other) and \ + self.dev == other.dev and \ + self.distro_tag == other.distro_tag and \ + self.release_tag == other.release_tag and \ + self.anon_dev == other.anon_dev and \ + self.anon_distro_tag == other.anon_distro_tag and \ + self.anon_release_tag == other.anon_release_tag + + +_vcs_configs = { + 'svn': SvnConfig, + 'git': GitConfig, + 'hg': HgConfig, + 'bzr': BzrConfig, +} + + +def get_vcs_configs(): + """ + :returns: Dictionary of supported :class:`VcsConfig` instances. + Key is the VCS type name, e.g. 'svn'. ``{str: VcsConfig}`` + """ + return _vcs_configs.copy() + + +def load_vcs_config(rules, rule_eval): + """ + Factory for creating :class:`VcsConfig` subclass based on + rosdistro _rules data. + + :param rules: rosdistro rules data + :param rules_eval: Function to apply to rule values, e.g. to + convert variables. ``fn(str)->str`` + :returns: :class:`VcsConfig` subclass instance with interpreted rules data. + """ + vcs_config = None + for k, clazz in _vcs_configs.items(): + if k in rules: + vcs_config = clazz() + vcs_config.load(rules[k], rule_eval) + break + return vcs_config + + +def _current_distro_electric_parse_roscore(roscore_file): + if not os.path.exists(roscore_file): + return None + import xml.dom.minidom + try: + dom = xml.dom.minidom.parse(roscore_file) + tags = dom.getElementsByTagName("param") + for t in tags: + if t.hasAttribute('name') and t.getAttribute('name') == 'rosdistro': + return t.getAttribute('value') + except: + return None + + +# for < fuerte, retrieve from roscore file +def _current_distro_electric(env=None): + if env is None: + env = os.environ + from . import RosPack, get_ros_paths + rospack = RosPack(get_ros_paths(env)) + # there's some chance that the location of this file changes in the future + try: + roscore_file = os.path.join(rospack.get_path('roslaunch'), 'roscore.xml') + return _current_distro_electric_parse_roscore(roscore_file) + except: + return None + + +def current_distro_codename(env=None): + """ + Get the currently active ROS distribution codename, e.g. 'fuerte' + + :param env: override os.environ, ``dict`` + """ + if env is None: + env = os.environ + + # ROS_DISTRO is only used in ros catkin buildspace. It is not + # meant to be well publicized and thus is not declared in + # rospkg.environment. + if 'ROS_DISTRO' in env: + return env['ROS_DISTRO'] + + # check for /etc/ros/distro file + distro_name = None + etc_ros = get_etc_ros_dir(env=env) + distro_file = os.path.join(etc_ros, 'distro') + if os.path.isfile(distro_file): + with open(distro_file, 'r') as f: + distro_name = f.read().strip() + + # fallback logic for pre-Fuerte + if distro_name is None: + distro_name = _current_distro_electric(env=env) + + return distro_name diff --git a/src/rospkg/environment.py b/src/rospkg/environment.py new file mode 100644 index 0000000..e807978 --- /dev/null +++ b/src/rospkg/environment.py @@ -0,0 +1,227 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +""" +Access information about ROS package system environment variables. +""" + +import os + +# Enviroment Variables + +# Global, usually set in setup +ROS_ROOT = "ROS_ROOT" +ROS_PACKAGE_PATH = "ROS_PACKAGE_PATH" +ROS_HOME = "ROS_HOME" + +# override directory path to /etc/ros +ROS_ETC_DIR = "ROS_ETC_DIR" + +# directory in which log files are written +ROS_LOG_DIR = "ROS_LOG_DIR" +# directory in which test result files are written +ROS_TEST_RESULTS_DIR = "ROS_TEST_RESULTS_DIR" + + +# Utilities +def _resolve_path(p): + """ + @param path: path string + @type path: str + Catch-all utility routine for fixing ROS environment variables that + are a single path (e.g. ROS_ROOT). Currently this just expands + tildes to home directories, but in the future it may encode other + behaviors. + """ + if p and p[0] == '~': + return os.path.expanduser(p) + return p + + +def _resolve_paths(paths): + """ + Catch-all utility routine for fixing ROS environment variables + that are paths (e.g. ROS_PACKAGE_PATH). Currently this just + expands tildes to home directories, but in the future it may + encode other behaviors. + + :param paths: path string with OS-defined separator (i.e. ':' for Linux), ``str`` + """ + splits = [p for p in paths.split(os.pathsep) if p] + return os.pathsep.join([_resolve_path(p) for p in splits]) + + +def get_ros_paths(env=None): + """ + Get an ordered list of ROS paths to search for ROS packages, + stacks, and other resources. This is generally computed from + :envvar:`ROS_ROOT` and :envvar:`ROS_PACKAGE_PATH`. + + :param env: override environment dictionary + """ + if env is None: + env = os.environ + return _compute_package_paths(get_ros_root(env), get_ros_package_path(env)) + + +def get_ros_root(env=None): + """ + Get the current :envvar:`ROS_ROOT`. + + :param env: override environment dictionary, ``dict`` + """ + if env is None: + env = os.environ + ros_root = env.get(ROS_ROOT, None) + if ros_root: + ros_root = os.path.normpath(ros_root) + return ros_root + + +def get_ros_package_path(env=None): + """ + Get the current ROS_PACKAGE_PATH. + :param env: (optional) environment override, ``dict`` + """ + if env is None: + env = os.environ + return env.get(ROS_PACKAGE_PATH, None) + + +def get_ros_home(env=None): + """ + Get directory location of '.ros' directory (aka ROS home). + possible locations for this. The :envvar:`ROS_HOME` environment + variable has priority. If :envvar:`ROS_HOME` is not set, + ``$HOME/.ros`` is used. + + :param env: override ``os.environ`` dictionary, ``dict`` + :returns: path to use use for log file directory, ``str`` + """ + if env is None: + env = os.environ + if ROS_HOME in env: + return env[ROS_HOME] + else: + # slightly more robust than $HOME + return os.path.join(os.path.expanduser('~'), '.ros') + + +def get_log_dir(env=None): + """ + Get directory to use for writing log files. There are multiple + possible locations for this. The ROS_LOG_DIR environment variable + has priority. If that is not set, then ROS_HOME/log is used. If + ROS_HOME is not set, $HOME/.ros/log is used. + + :param env: override os.environ dictionary, ``dict`` + :returns: path to use use for log file directory, ``str`` + """ + if env is None: + env = os.environ + if ROS_LOG_DIR in env: + return env[ROS_LOG_DIR] + else: + return os.path.join(get_ros_home(env), 'log') + + +def get_test_results_dir(env=None): + """ + Get directory to use for writing test result files. There are + multiple possible locations for this. The + :envvar:`ROS_TEST_RESULTS_DIR` environment variable has + priority. If that is set, :envvar:`ROS_TEST_RESULTS_DIR` is + returned. If :envvar:`ROS_TEST_RESULTS_DIR` is not set, then + :envvar:`ROS_HOME`/test_results is used. If :envvar:`ROS_HOME` is + not set, ``$HOME/.ros/test_results`` is used. + + :param env: environment dictionary (defaults to ``os.environ``), ``dict`` + :returns: path to use use for log file directory, ``str`` + """ + if env is None: + env = os.environ + + if ROS_TEST_RESULTS_DIR in env: + return env[ROS_TEST_RESULTS_DIR] + else: + return os.path.join(get_ros_home(env), 'test_results') + + +def _compute_package_paths(ros_root, ros_package_path): + """ + Get the paths to search for packages in normal precedence order (i.e. first path wins). + + :param ros_root: value of ROS_ROOT parameter, ``str`` + :param ros_package_path: value of ROS_PACKAGE_PATH parameter, ``str`` + :returns: paths to search in reverse order of precedence, ``[str]`` + """ + paths = [] + if ros_root: + paths.append(ros_root) + if ros_package_path: + paths.extend([x for x in ros_package_path.split(os.pathsep) if x.strip()]) + return paths + + +def on_ros_path(p, env=None): + """ + Check to see if filesystem path is on paths specified in ROS + environment (:envvar:`ROS_ROOT`, :envvar:`ROS_PACKAGE_PATH`). + + :param p: path, ``str`` + :return: ``True`` if p is on the ROS path, ``bool`` + """ + if env is None: + env = os.environ + + package = os.path.realpath(_resolve_path(p)) + # filter out non-paths (e.g. if no ROS environment is configured) + paths = get_ros_paths(env) + paths = [os.path.realpath(_resolve_path(x)) for x in paths] + return bool([x for x in paths if package == x or package.startswith(x + os.sep)]) + + +def get_etc_ros_dir(env=None): + """ + Get directory location of ``/etc/ros`` directory. The + :envvar:`ROS_ETC_DIR` environment variable has priority. If + :envvar:`ROS_ETC_DIR` is not set, ``/etc/ros`` is used. + + :param env: override environment dictionary + :return: path to use use for log file directory + """ + if env is None: + env = os.environ + if ROS_ETC_DIR in env: + return env[ROS_ETC_DIR] + else: + return '/etc/ros' diff --git a/src/rospkg/manifest.py b/src/rospkg/manifest.py new file mode 100644 index 0000000..20cecfe --- /dev/null +++ b/src/rospkg/manifest.py @@ -0,0 +1,516 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2008, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +""" +Library for processing 'manifest' files, i.e. manifest.xml and +stack.xml. +""" + +import os +import sys +import xml.dom.minidom as dom + +from .common import MANIFEST_FILE, PACKAGE_FILE, STACK_FILE + +# stack.xml and manifest.xml have the same internal tags right now +REQUIRED = ['license'] +ALLOWXHTML = ['description'] +OPTIONAL = ['author', 'logo', 'url', 'brief', 'description', 'status', + 'notes', 'depend', 'rosdep', 'export', 'review', + 'versioncontrol', 'platform', 'version', 'rosbuild2', + 'catkin'] +VALID = REQUIRED + OPTIONAL + + +class InvalidManifest(Exception): + pass + + +def _get_nodes_by_name(n, name): + return [t for t in n.childNodes if t.nodeType == t.ELEMENT_NODE and t.tagName == name] + + +def _check_optional(name, allowXHTML=False, merge_multiple=False): + """ + Validator for optional elements. + + :raise: :exc:`InvalidManifest` If validation fails + """ + def check(n, filename): + n = _get_nodes_by_name(n, name) + if len(n) > 1 and not merge_multiple: + raise InvalidManifest("Invalid manifest file [%s]: must have a single '%s' element" % (filename, name)) + if n: + values = [] + for child in n: + if allowXHTML: + values.append(''.join([x.toxml() for x in child.childNodes])) + else: + values.append(_get_text(child.childNodes).strip()) + return ', '.join(values) + return check + + +def _check_required(name, allowXHTML=False, merge_multiple=False): + """ + Validator for required elements. + + :raise: :exc:`InvalidManifest` If validation fails + """ + def check(n, filename): + n = _get_nodes_by_name(n, name) + if not n: + return '' + if len(n) != 1 and not merge_multiple: + raise InvalidManifest("Invalid manifest file: must have only one '%s' element" % name) + values = [] + for child in n: + if allowXHTML: + values.append(''.join([x.toxml() for x in child.childNodes])) + else: + values.append(_get_text(child.childNodes).strip()) + return ', '.join(values) + return check + + +def _check_platform(n, filename): + """ + Validator for manifest platform. + :raise: :exc:`InvalidManifest` If validation fails + """ + platforms = _get_nodes_by_name(n, 'platform') + try: + vals = [(p.attributes['os'].value, p.attributes['version'].value, p.getAttribute('notes')) for p in platforms] + except KeyError as e: + raise InvalidManifest(" tag is missing required '%s' attribute" % str(e)) + return [Platform(*v) for v in vals] + + +def _check_depends(type_, n, filename): + """ + Validator for manifest depends. + :raise: :exc:`InvalidManifest` If validation fails + """ + nodes = _get_nodes_by_name(n, 'depend') + # TDS 20110419: this is a hack. + # rosbuild2 has a tag, + # which is confusing this subroutine with + # KeyError: 'package' + # for now, explicitly don't consider thirdparty depends + depends = [e.attributes for e in nodes if 'thirdparty' not in e.attributes.keys()] + try: + depend_names = [d[type_].value for d in depends] + except KeyError: + raise InvalidManifest("Invalid manifest file [%s]: depends is missing '%s' attribute" % (filename, type_)) + + return [Depend(name, type_) for name in depend_names] + + +def _check_rosdeps(n, filename): + """ + Validator for stack rosdeps. + + :raises: :exc:`InvalidManifest` If validation fails + """ + try: + nodes = _get_nodes_by_name(n, 'rosdep') + rosdeps = [e.attributes for e in nodes] + names = [d['name'].value for d in rosdeps] + return [RosDep(n) for n in names] + except KeyError: + raise InvalidManifest("invalid rosdep tag in [%s]" % (filename)) + + +def _attrs(node): + attrs = {} + for k in node.attributes.keys(): + attrs[k] = node.attributes.get(k).value + return attrs + + +def _check_exports(n, filename): + ret_val = [] + for e in _get_nodes_by_name(n, 'export'): + elements = [c for c in e.childNodes if c.nodeType == c.ELEMENT_NODE] + ret_val.extend([Export(t.tagName, _attrs(t), _get_text(t.childNodes)) for t in elements]) + return ret_val + + +def _check(name, merge_multiple=False): + """ + Generic validator for text-based tags. + """ + if name in REQUIRED: + if name in ALLOWXHTML: + return _check_required(name, True, merge_multiple) + return _check_required(name, merge_multiple=merge_multiple) + elif name in OPTIONAL: + if name in ALLOWXHTML: + return _check_optional(name, True, merge_multiple) + return _check_optional(name, merge_multiple=merge_multiple) + + +class Export(object): + """ + Manifest 'export' tag + """ + + def __init__(self, tag, attrs, str): + """ + Create new export instance. + :param tag: name of the XML tag + @type tag: str + :param attrs: dictionary of XML attributes for this export tag + @type attrs: dict + :param str: string value contained by tag, if any + @type str: str + """ + self.tag = tag + self.attrs = attrs + self.str = str + + def get(self, attr): + """ + :returns: value of attribute or ``None`` if attribute not set, ``str`` + """ + return self.attrs.get(attr, None) + + +class Platform(object): + """ + Manifest 'platform' tag + """ + __slots__ = ['os', 'version', 'notes'] + + def __init__(self, os_, version, notes=None): + """ + Create new depend instance. + :param os_: OS name. must be non-empty, ``str`` + :param version: OS version. must be non-empty, ``str`` + :param notes: (optional) notes about platform support, ``str`` + """ + if not os_: + raise ValueError("bad 'os' attribute") + if not version: + raise ValueError("bad 'version' attribute") + self.os = os_ + self.version = version + self.notes = notes + + def __str__(self): + return "%s %s" % (self.os, self.version) + + def __repr__(self): + return "%s %s" % (self.os, self.version) + + def __eq__(self, obj): + """ + Override equality test. notes *are* considered in the equality test. + """ + if not isinstance(obj, Platform): + return False + return self.os == obj.os and self.version == obj.version and self.notes == obj.notes + + def __hash__(self): + """ + :returns: an integer, which must be the same for two equal instances. + + Since __eq__ is defined, Python 3 requires that this class also provide a __hash__ method. + """ + return hash(self.os) ^ hash(self.version) ^ hash(self.notes) + + +class Depend(object): + """ + Manifest 'depend' tag + """ + __slots__ = ['name', 'type'] + + def __init__(self, name, type_): + """ + Create new depend instance. + :param name: dependency name (e.g. package/stack). Must be non-empty + @type name: str + :param type_: dependency type, e.g. 'package', 'stack'. Must be non-empty. + @type type_: str + + @raise ValueError: if parameters are invalid + """ + if not name: + raise ValueError("bad '%s' attribute" % (type_)) + if not type_: + raise ValueError("type_ must be specified") + self.name = name + self.type = type_ + + def __str__(self): + return self.name + + def __repr__(self): + return self.name + + def __eq__(self, obj): + if not isinstance(obj, Depend): + return False + return self.name == obj.name and self.type == obj.type + + def __hash__(self): + """ + :returns: an integer, which must be the same for two equal instances. + + Since __eq__ is defined, Python 3 requires that this class also provide a __hash__ method. + """ + return hash(self.name) ^ hash(self.type) + + +class RosDep(object): + """ + Manifest 'rosdep' tag + """ + __slots__ = ['name'] + + def __init__(self, name): + """ + Create new rosdep instance. + + :param name: dependency name. Must be non-empty. ``str`` + """ + if not name: + raise ValueError("bad 'name' attribute") + self.name = name + + +class Manifest(object): + """ + Object representation of a ROS manifest file (``manifest.xml`` and ``stack.xml``) + """ + __slots__ = [ + 'description', 'brief', + 'author', 'license', 'licenses', 'license_url', 'url', + 'depends', 'rosdeps', 'platforms', + 'exports', 'version', + 'status', 'notes', + 'unknown_tags', 'type', 'filename', + 'is_catkin'] + + def __init__(self, type_='package', filename=None, is_catkin=False): + """ + :param type: `'package'` or `'stack'` + :param filename: location of manifest file. Necessary if + converting ``${prefix}`` in ```` values, ``str``. + """ + self.description = self.brief = self.author = \ + self.license = self.license_url = \ + self.url = self.status = \ + self.version = self.notes = '' + self.licenses = [] + self.depends = [] + self.rosdeps = [] + self.exports = [] + self.platforms = [] + self.is_catkin = is_catkin + + self.type = type_ + self.filename = filename + + # store unrecognized tags during parsing + self.unknown_tags = [] + + def get_export(self, tag, attr, convert=True): + """ + :param tag: Name of XML tag to retrieve, ``str`` + :param attr: Name of XML attribute to retrieve from tag, ``str`` + :param convert: If ``True``, interpret variables (e.g. ``${prefix}``) export values. + :returns: exports that match the specified tag and attribute, e.g. 'python', 'path'. ``[str]`` + """ + vals = [e.get(attr) for e in self.exports if e.tag == tag if e.get(attr) is not None] + if convert: + if not self.filename: + raise ValueError("cannot convert export values when filename for Manifest is not set") + prefix = os.path.dirname(self.filename) + vals = [v.replace('${prefix}', prefix) for v in vals] + return vals + + +def _get_text(nodes): + """ + DOM utility routine for getting contents of text nodes + """ + return "".join([n.data for n in nodes if n.nodeType == n.TEXT_NODE]) + + +_static_rosdep_view = None + + +def parse_manifest_file(dirpath, manifest_name, rospack=None): + """ + Parse manifest file (package, stack). Type will be inferred from manifest_name. + + :param dirpath: directory of manifest file, ``str`` + :param manifest_name: ``MANIFEST_FILE`` or ``STACK_FILE``, ``str`` + :param rospack: a RosPack instance to identify local packages as ROS packages + + :returns: return :class:`Manifest` instance, populated with parsed fields + :raises: :exc:`InvalidManifest` + :raises: :exc:`IOError` + """ + filename = os.path.join(dirpath, manifest_name) + if not os.path.isfile(filename): + # hack for backward compatibility + package_filename = os.path.join(dirpath, PACKAGE_FILE) + if not os.path.isfile(package_filename): + raise IOError("Invalid/non-existent manifest file: %s" % filename) + manifest = Manifest(filename=filename, is_catkin=True) + + # extract all information from package.xml + from catkin_pkg.package import parse_package + p = parse_package(package_filename) + # put these into manifest + manifest.description = p.description + manifest.author = ', '.join([('Maintainer: %s' % str(m)) for m in p.maintainers] + [str(a) for a in p.authors]) + manifest.license = ', '.join(p.licenses) + manifest.licenses = p.licenses + if p.urls: + manifest.url = str(p.urls[0]) + manifest.version = p.version + for export in p.exports: + manifest.exports.append(Export(export.tagname, export.attributes, export.content)) + + # split ros and system dependencies (using rosdep) + try: + from rosdep2.rospack import init_rospack_interface, is_ros_package, is_system_dependency, is_view_empty + global _static_rosdep_view + # initialize rosdep view once + if _static_rosdep_view is None: + _static_rosdep_view = init_rospack_interface() + if is_view_empty(_static_rosdep_view): + sys.stderr.write("the rosdep view is empty: call 'sudo rosdep init' and 'rosdep update'\n") + _static_rosdep_view = False + if _static_rosdep_view: + depends = set([]) + rosdeps = set([]) + for d in (p.buildtool_depends + p.build_depends + p.run_depends + p.test_depends): + if (rospack and d.name in rospack.list()) or is_ros_package(_static_rosdep_view, d.name): + depends.add(d.name) + if is_system_dependency(_static_rosdep_view, d.name): + rosdeps.add(d.name) + for name in depends: + manifest.depends.append(Depend(name, 'package')) + for name in rosdeps: + manifest.rosdeps.append(RosDep(name)) + except ImportError: + pass + + return manifest + + with open(filename, 'rb') as f: + data = f.read() + if sys.version_info[0] >= 3: + data = data.decode('utf-8') + return parse_manifest(manifest_name, data, filename) + + +def parse_manifest(manifest_name, string, filename='string'): + """ + Parse manifest string contents. + + :param manifest_name: ``MANIFEST_FILE`` or ``STACK_FILE``, ``str`` + :param string: manifest.xml contents, ``str`` + :param filename: full file path for debugging, ``str`` + :returns: return parsed :class:`Manifest` + """ + if manifest_name == MANIFEST_FILE: + type_ = 'package' + elif manifest_name == STACK_FILE: + type_ = 'stack' + + try: + d = dom.parseString(string) + except Exception as e: + raise InvalidManifest("[%s] invalid XML: %s" % (filename, e)) + + m = Manifest(type_, filename) + p = _get_nodes_by_name(d, type_) + if len(p) != 1: + raise InvalidManifest("manifest [%s] must have a single '%s' element" % (filename, type_)) + p = p[0] + m.description = _check('description')(p, filename) + m.brief = '' + try: + tag = _get_nodes_by_name(p, 'description')[0] + m.brief = tag.getAttribute('brief') or '' + except: + # means that 'description' tag is missing + pass + + m.depends = _check_depends(type_, p, filename) + m.rosdeps = _check_rosdeps(p, filename) + m.platforms = _check_platform(p, filename) + m.exports = _check_exports(p, filename) + m.license = _check('license')(p, filename) + m.license_url = '' + try: + tag = _get_nodes_by_name(p, 'license')[0] + m.license_url = tag.getAttribute('url') or '' + except: + pass # manifest is missing required 'license' tag + + m.status = 'unreviewed' + try: + tag = _get_nodes_by_name(p, 'review')[0] + m.status = tag.getAttribute('status') or '' + except: + pass # manifest is missing optional 'review status' tag + + m.notes = '' + try: + tag = _get_nodes_by_name(p, 'review')[0] + m.notes = tag.getAttribute('notes') or '' + except: + pass # manifest is missing optional 'review notes' tag + + m.author = _check('author', True)(p, filename) + m.url = _check('url')(p, filename) + m.version = _check('version')(p, filename) + + # do some validation on what we just parsed + if type_ == 'stack': + if m.exports: + raise InvalidManifest("stack manifests are not allowed to have exports") + if m.rosdeps: + raise InvalidManifest("stack manifests are not allowed to have rosdeps") + + m.is_catkin = bool(_get_nodes_by_name(p, 'catkin')) or bool(_get_nodes_by_name(p, 'name')) + + # store unrecognized tags + m.unknown_tags = [e for e in p.childNodes if e.nodeType == e.ELEMENT_NODE and e.tagName not in VALID] + return m diff --git a/src/rospkg/os_detect.py b/src/rospkg/os_detect.py new file mode 100644 index 0000000..034d94a --- /dev/null +++ b/src/rospkg/os_detect.py @@ -0,0 +1,783 @@ +# Copyright (c) 2009, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of the Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +# Author Tully Foote/tfoote@willowgarage.com, Ken Conley/kwc@willowgarage.com + +""" +Library for detecting the current OS, including detecting specific +Linux distributions. +""" + +from __future__ import absolute_import +from __future__ import print_function + +import codecs +# to be removed after Ubuntu Xenial is out of support +import sys +if sys.version_info >= (3, 8): + import distro +else: + import platform as distro +import locale +import os +import platform +import subprocess + + +def _read_stdout(cmd): + try: + pop = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (std_out, std_err) = pop.communicate() + # Python 2.6 compatibility + if isinstance(std_out, str): + return std_out.strip() + return std_out.decode(encoding='UTF-8').strip() + except: + return None + + +def uname_get_machine(): + """ + Linux: wrapper around uname to determine if OS is 64-bit + """ + return _read_stdout(['uname', '-m']) + + +def read_issue(filename="/etc/issue"): + """ + :returns: list of strings in issue file, or None if issue file cannot be read/split + """ + if os.path.exists(filename): + with codecs.open(filename, 'r', encoding=locale.getpreferredencoding()) as f: + return f.read().split() + return None + + +def read_os_release(filename=None): + """ + :returns: Dictionary of key value pairs from /etc/os-release or fallback to + /usr/lib/os-release, with quotes stripped from values + """ + if filename is None: + filename = '/etc/os-release' + if not os.path.exists(filename): + filename = '/usr/lib/os-release' + + if not os.path.exists(filename): + return None + + release_info = {} + with codecs.open(filename, 'r', encoding=locale.getpreferredencoding()) as f: + for line in f: + key, val = line.rstrip('\n').partition('=')[::2] + release_info[key] = val.strip('"') + return release_info + + +class OsNotDetected(Exception): + """ + Exception to indicate failure to detect operating system. + """ + pass + + +class OsDetector(object): + """ + Generic API for detecting a specific OS. + """ + def is_os(self): + """ + :returns: if the specific OS which this class is designed to + detect is present. Only one version of this class should + return for any version. + """ + raise NotImplementedError("is_os unimplemented") + + def get_version(self): + """ + :returns: standardized version for this OS. (aka Ubuntu Hardy Heron = "8.04") + :raises: :exc:`OsNotDetected` if called on incorrect OS. + """ + raise NotImplementedError("get_version unimplemented") + + def get_codename(self): + """ + :returns: codename for this OS. (aka Ubuntu Hardy Heron = "hardy"). If codenames are not available for this OS, return empty string. + :raises: :exc:`OsNotDetected` if called on incorrect OS. + """ + raise NotImplementedError("get_codename unimplemented") + + +class LsbDetect(OsDetector): + """ + Generic detector for Debian, Ubuntu, Mint, and Pop! OS + """ + def __init__(self, lsb_name, get_version_fn=None): + self.lsb_name = lsb_name + if hasattr(distro, "linux_distribution"): + self.lsb_info = distro.linux_distribution(full_distribution_name=0) + elif hasattr(distro, "dist"): + self.lsb_info = distro.dist() + else: + self.lsb_info = None + + def is_os(self): + if self.lsb_info is None: + return False + # Work around platform returning 'Ubuntu' and distro returning 'ubuntu' + return self.lsb_info[0].lower() == self.lsb_name.lower() + + def get_version(self): + if self.is_os(): + return self.lsb_info[1] + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + return self.lsb_info[2] + raise OsNotDetected('called in incorrect OS') + + +class Debian(LsbDetect): + + def __init__(self, get_version_fn=None): + super(Debian, self).__init__('debian', get_version_fn) + + def get_codename(self): + if self.is_os(): + v = self.get_version() + if v.startswith('7.'): + return 'wheezy' + if v.startswith('8.'): + return 'jessie' + if v.startswith('9.'): + return 'stretch' + if v.startswith('10.'): + return 'buster' + return '' + + +class FdoDetect(OsDetector): + """ + Generic detector for operating systems implementing /etc/os-release, as defined by the os-release spec hosted at Freedesktop.org (Fdo): + http://www.freedesktop.org/software/systemd/man/os-release.html + Requires that the "ID", and "VERSION_ID" keys are set in the os-release file. + + Codename is parsed from the VERSION key if available: either using the format "foo, CODENAME" or "foo (CODENAME)." + If the VERSION key is not present, the VERSION_ID is value is used as the codename. + """ + def __init__(self, fdo_id): + release_info = read_os_release() + if release_info is not None and "ID" in release_info and release_info["ID"] == fdo_id: + self.release_info = release_info + else: + self.release_info = None + + def is_os(self): + return self.release_info is not None and "VERSION_ID" in self.release_info + + def get_version(self): + if self.is_os(): + return self.release_info["VERSION_ID"] + raise OsNotDetected("called in incorrect OS") + + def get_codename(self): + if self.is_os(): + if "VERSION" in self.release_info: + version = self.release_info["VERSION"] + # FDO style: works with Fedora, Debian, Suse. + if '(' in version: + codename = version[version.find("(") + 1:version.find(")")] + # Ubuntu style + elif '"' in version: + codename = version[version.find(",") + 1:].lstrip(' ').split()[0] + # Indeterminate style + else: + codename = version + return codename.lower() + else: + return self.get_version() + raise OsNotDetected("called in incorrect OS") + + +class OpenEmbedded(OsDetector): + """ + Detect OpenEmbedded. + """ + def is_os(self): + return "ROS_OS_OVERRIDE" in os.environ and os.environ["ROS_OS_OVERRIDE"] == "openembedded" + + def get_version(self): + if self.is_os(): + return "" + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + return "" + raise OsNotDetected('called in incorrect OS') + + +class OpenSuse(OsDetector): + """ + Detect OpenSuse OS. + """ + def __init__(self, brand_file="/etc/SuSE-brand", release_file="/etc/SuSE-release"): + self._brand_file = brand_file + self._release_file = release_file + + def is_os(self): + os_list = read_issue(self._brand_file) + return os_list and os_list[0] == "openSUSE" + + def get_version(self): + if self.is_os() and os.path.exists(self._brand_file): + with open(self._brand_file, 'r') as fh: + os_list = fh.read().strip().split('\n') + if len(os_list) == 2: + os_list = os_list[1].split(' = ') + if os_list[0] == "VERSION": + return os_list[1] + raise OsNotDetected('cannot get version on this OS') + + def get_codename(self): + # /etc/SuSE-release is deprecated since 13.1 + if self._release_file is None: + return "" + if self.is_os() and os.path.exists(self._release_file): + with open(self._release_file, 'r') as fh: + os_list = fh.read().strip().split('\n') + for line in os_list: + kv = line.split(' = ') + if kv[0] == "CODENAME": + return kv[1] + raise OsNotDetected('called in incorrect OS') + + +class Fedora(OsDetector): + """ + Detect Fedora OS. + """ + def __init__(self, release_file="/etc/redhat-release", issue_file="/etc/issue"): + self._release_file = release_file + self._issue_file = issue_file + + def is_os(self): + os_list = read_issue(self._release_file) + return os_list and os_list[0] == "Fedora" + + def get_version(self): + if self.is_os(): + os_list = read_issue(self._issue_file) + idx = os_list.index('release') + if idx > 0: + return os_list[idx + 1] + raise OsNotDetected('cannot get version on this OS') + + def get_codename(self): + if self.is_os(): + os_list = read_issue(self._release_file) + idx = os_list.index('release') + matches = [x for x in os_list if x[0] == '('] + codename = matches[0][1:] + if codename[-1] == ')': + codename = codename[:-1] + return codename.lower() + raise OsNotDetected('called in incorrect OS') + + +class Rhel(Fedora): + """ + Detect Redhat OS. + """ + def __init__(self, release_file="/etc/redhat-release"): + self._release_file = release_file + + def is_os(self): + os_list = read_issue(self._release_file) + return os_list and os_list[:3] == ['Red', 'Hat', 'Enterprise'] + + def get_version(self): + if self.is_os(): + os_list = read_issue(self._release_file) + idx = os_list.index('release') + return os_list[idx + 1] + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + # taroon, nahant, tikanga, santiago, pensacola + if self.is_os(): + os_list = read_issue(self._release_file) + idx = os_list.index('release') + matches = [x for x in os_list if x[0] == '('] + codename = matches[0][1:] + if codename[-1] == ')': + codename = codename[:-1] + return codename.lower() + raise OsNotDetected('called in incorrect OS') + + +# Source: https://en.wikipedia.org/wiki/MacOS#Versions +_osx_codename_map = { + 4: 'tiger', + 5: 'leopard', + 6: 'snow', + 7: 'lion', + 8: 'mountain lion', + 9: 'mavericks', + 10: 'yosemite', + 11: 'el capitan', + 12: 'sierra', + 13: 'high sierra', + 14: 'mojave', + 15: 'catalina', +} + + +def _osx_codename(major, minor): + if major != 10 or minor not in _osx_codename_map: + raise OsNotDetected("unrecognized version: %s.%s" % (major, minor)) + return _osx_codename_map[minor] + + +class OSX(OsDetector): + """ + Detect OS X + """ + def __init__(self, sw_vers_file="/usr/bin/sw_vers"): + self._sw_vers_file = sw_vers_file + + def is_os(self): + return os.path.exists(self._sw_vers_file) + + def get_codename(self): + if self.is_os(): + version = self.get_version() + import distutils.version # To parse version numbers + try: + ver = distutils.version.StrictVersion(version).version + except ValueError: + raise OsNotDetected("invalid version string: %s" % (version)) + return _osx_codename(*ver[0:2]) + raise OsNotDetected('called in incorrect OS') + + def get_version(self): + if self.is_os(): + return _read_stdout([self._sw_vers_file, '-productVersion']) + raise OsNotDetected('called in incorrect OS') + + +class QNX(OsDetector): + ''' + Detect QNX realtime OS. + @author: Isaac Saito + ''' + def __init__(self, uname_file='/bin/uname'): + ''' + @param uname_file: An executable that can be used for detecting + OS name and version. + ''' + self._os_name_qnx = 'QNX' + self._uname_file = uname_file + + def is_os(self): + if os.path.exists(self._uname_file): + std_out = _read_stdout([self._uname_file]) + return std_out.strip() == self._os_name_qnx + else: + return False + + def get_codename(self): + if self.is_os(): + return '' + raise OsNotDetected('called in incorrect OS') + + def get_version(self): + if self.is_os() and os.path.exists(self._uname_file): + return _read_stdout([self._uname_file, "-r"]) + raise OsNotDetected('called in incorrect OS') + + +class Arch(OsDetector): + """ + Detect Arch Linux. + """ + def __init__(self, release_file='/etc/arch-release'): + self._release_file = release_file + + def is_os(self): + return os.path.exists(self._release_file) + + def get_version(self): + if self.is_os(): + return "" + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + return "" + raise OsNotDetected('called in incorrect OS') + + +class Manjaro(Arch): + """ + Detect Manjaro. + """ + def __init__(self, release_file='/etc/manjaro-release'): + super(Manjaro, self).__init__(release_file) + + +class Centos(OsDetector): + """ + Detect CentOS. + """ + def __init__(self, release_file='/etc/redhat-release'): + self._release_file = release_file + + def is_os(self): + os_list = read_issue(self._release_file) + return os_list and os_list[0] == 'CentOS' + + def get_version(self): + if self.is_os(): + os_list = read_issue(self._release_file) + idx = os_list.index('release') + return os_list[idx + 1] + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + os_list = read_issue(self._release_file) + idx = os_list.index('release') + matches = [x for x in os_list if x[0] == '('] + codename = matches[0][1:] + if codename[-1] == ')': + codename = codename[:-1] + return codename.lower() + raise OsNotDetected('called in incorrect OS') + + +class Cygwin(OsDetector): + """ + Detect Cygwin presence on Windows OS. + """ + def is_os(self): + return os.path.exists("/usr/bin/cygwin1.dll") + + def get_version(self): + if self.is_os(): + return _read_stdout(['uname', '-r']) + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + return '' + raise OsNotDetected('called in incorrect OS') + + +class Gentoo(OsDetector): + """ + Detect Gentoo OS. + """ + def __init__(self, release_file="/etc/gentoo-release"): + self._release_file = release_file + + def is_os(self): + os_list = read_issue(self._release_file) + return os_list and os_list[0] == "Gentoo" and os_list[1] == "Base" + + def get_version(self): + if self.is_os(): + os_list = read_issue(self._release_file) + return os_list[4] + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + return '' + raise OsNotDetected('called in incorrect OS') + + +class Funtoo(Gentoo): + """ + Detect Funtoo OS, a Gentoo Variant. + """ + def __init__(self, release_file="/etc/gentoo-release"): + Gentoo.__init__(self, release_file) + + def is_os(self): + os_list = read_issue(self._release_file) + return os_list and os_list[0] == "Funtoo" and os_list[1] == "Linux" + + +class FreeBSD(OsDetector): + """ + Detect FreeBSD OS. + """ + def __init__(self, uname_file="/usr/bin/uname"): + self._uname_file = uname_file + + def is_os(self): + if os.path.exists(self._uname_file): + std_out = _read_stdout([self._uname_file]) + return std_out.strip() == "FreeBSD" + else: + return False + + def get_version(self): + if self.is_os() and os.path.exists(self._uname_file): + return _read_stdout([self._uname_file, "-r"]) + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + return '' + raise OsNotDetected('called in incorrect OS') + + +class Slackware(OsDetector): + """ + Detect SlackWare Linux. + """ + def __init__(self, release_file='/etc/slackware-version'): + self._release_file = release_file + + def is_os(self): + return os.path.exists(self._release_file) + + def get_version(self): + if self.is_os(): + os_list = read_issue(self._release_file) + return os_list[1] + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + return '' + raise OsNotDetected('called in incorrect OS') + + +class Windows(OsDetector): + """ + Detect Windows OS. + """ + def is_os(self): + return platform.system() == "Windows" + + def get_version(self): + if self.is_os(): + return platform.version() + raise OsNotDetected('called in incorrect OS') + + def get_codename(self): + if self.is_os(): + return platform.release() + raise OsNotDetected('called in incorrect OS') + + +class OsDetect: + """ + This class will iterate over registered classes to lookup the + active OS and version + """ + + default_os_list = [] + + def __init__(self, os_list=None): + if os_list is None: + os_list = OsDetect.default_os_list + self._os_list = os_list + self._os_name = None + self._os_version = None + self._os_codename = None + self._os_detector = None + self._override = False + + @staticmethod + def register_default(os_name, os_detector): + """ + Register detector to be used with all future instances of + :class:`OsDetect`. The new detector will have precedence over + any previously registered detectors associated with *os_name*. + + :param os_name: OS key associated with OS detector + :param os_detector: :class:`OsDetector` instance + """ + OsDetect.default_os_list.insert(0, (os_name, os_detector)) + + def detect_os(self, env=None): + """ + Detect operating system. Return value can be overridden by + the :env:`ROS_OS_OVERRIDE` environment variable. + + :param env: override ``os.environ`` + :returns: (os_name, os_version, os_codename), ``(str, str, str)`` + :raises: :exc:`OsNotDetected` if OS could not be detected + """ + if env is None: + env = os.environ + if 'ROS_OS_OVERRIDE' in env: + splits = env["ROS_OS_OVERRIDE"].split(':') + self._os_name = splits[0] + if len(splits) > 1: + self._os_version = splits[1] + if len(splits) > 2: + self._os_codename = splits[2] + else: + self._os_codename = '' + else: + self._os_version = self._os_codename = '' + self._override = True + else: + for os_name, os_detector in self._os_list: + if os_detector.is_os(): + self._os_name = os_name + self._os_version = os_detector.get_version() + self._os_codename = os_detector.get_codename() + self._os_detector = os_detector + break + + if self._os_name: + return self._os_name, self._os_version, self._os_codename + else: # No solution found + attempted = [x[0] for x in self._os_list] + raise OsNotDetected("Could not detect OS, tried %s" % attempted) + + def get_detector(self, name=None): + """ + Get detector used for specified OS name, or the detector for this OS if name is ``None``. + + :raises: :exc:`KeyError` + """ + if name is None: + if not self._os_detector: + self.detect_os() + return self._os_detector + else: + try: + return [d for d_name, d in self._os_list if d_name == name][0] + except IndexError: + raise KeyError(name) + + def add_detector(self, name, detector): + """ + Add detector to list of detectors used by this instance. *detector* will override any previous + detectors associated with *name*. + + :param name: OS name that detector matches + :param detector: :class:`OsDetector` instance + """ + self._os_list.insert(0, (name, detector)) + + def get_name(self): + if not self._os_name: + self.detect_os() + return self._os_name + + def get_version(self): + if not self._os_version: + self.detect_os() + return self._os_version + + def get_codename(self): + if not self._os_codename: + self.detect_os() + return self._os_codename + + +OS_ALPINE = 'alpine' +OS_ARCH = 'arch' +OS_MANJARO = 'manjaro' +OS_CENTOS = 'centos' +OS_CYGWIN = 'cygwin' +OS_DEBIAN = 'debian' +OS_ELEMENTARY = 'elementary' +OS_ELEMENTARY_OLD = 'elementary' +OS_FEDORA = 'fedora' +OS_FREEBSD = 'freebsd' +OS_FUNTOO = 'funtoo' +OS_GENTOO = 'gentoo' +OS_LINARO = 'linaro' +OS_MINT = 'mint' +OS_MX = 'mx' +OS_NEON = 'neon' +OS_OPENEMBEDDED = 'openembedded' +OS_OPENSUSE = 'opensuse' +OS_OPENSUSE13 = 'opensuse' +OS_TIZEN = 'tizen' +OS_SAILFISHOS = 'sailfishos' +OS_OSX = 'osx' +OS_POP = 'pop' +OS_QNX = 'qnx' +OS_RHEL = 'rhel' +OS_SLACKWARE = 'slackware' +OS_UBUNTU = 'ubuntu' +OS_CLEARLINUX = 'clearlinux' +OS_NIXOS = 'nixos' +OS_WINDOWS = 'windows' +OS_ZORIN = 'zorin' + +OsDetect.register_default(OS_ALPINE, FdoDetect("alpine")) +OsDetect.register_default(OS_ARCH, Arch()) +OsDetect.register_default(OS_MANJARO, Manjaro()) +OsDetect.register_default(OS_CENTOS, Centos()) +OsDetect.register_default(OS_CYGWIN, Cygwin()) +OsDetect.register_default(OS_DEBIAN, Debian()) +OsDetect.register_default(OS_ELEMENTARY, LsbDetect("elementary")) +OsDetect.register_default(OS_ELEMENTARY_OLD, LsbDetect("elementary OS")) +OsDetect.register_default(OS_FEDORA, FdoDetect("fedora")) +OsDetect.register_default(OS_FREEBSD, FreeBSD()) +OsDetect.register_default(OS_FUNTOO, Funtoo()) +OsDetect.register_default(OS_GENTOO, Gentoo()) +OsDetect.register_default(OS_LINARO, LsbDetect("Linaro")) +OsDetect.register_default(OS_MINT, LsbDetect("LinuxMint")) +OsDetect.register_default(OS_MX, LsbDetect("MX")) +OsDetect.register_default(OS_NEON, LsbDetect("neon")) +OsDetect.register_default(OS_OPENEMBEDDED, OpenEmbedded()) +OsDetect.register_default(OS_OPENSUSE, OpenSuse()) +OsDetect.register_default(OS_OPENSUSE13, OpenSuse(brand_file='/etc/SUSE-brand', release_file=None)) +OsDetect.register_default(OS_OPENSUSE, FdoDetect("opensuse-tumbleweed")) +OsDetect.register_default(OS_OPENSUSE, FdoDetect("opensuse")) +OsDetect.register_default(OS_TIZEN, FdoDetect("tizen")) +OsDetect.register_default(OS_SAILFISHOS, FdoDetect("sailfishos")) +OsDetect.register_default(OS_OSX, OSX()) +OsDetect.register_default(OS_POP, LsbDetect("Pop")) +OsDetect.register_default(OS_QNX, QNX()) +OsDetect.register_default(OS_RHEL, Rhel()) +OsDetect.register_default(OS_SLACKWARE, Slackware()) +OsDetect.register_default(OS_UBUNTU, LsbDetect("Ubuntu")) +OsDetect.register_default(OS_CLEARLINUX, FdoDetect("clear-linux-os")) +OsDetect.register_default(OS_NIXOS, FdoDetect("nixos")) +OsDetect.register_default(OS_WINDOWS, Windows()) +OsDetect.register_default(OS_ZORIN, LsbDetect("Zorin")) + + +if __name__ == '__main__': + detect = OsDetect() + print("OS Name: %s" % detect.get_name()) + print("OS Version: %s" % detect.get_version()) + print("OS Codename: %s" % detect.get_codename()) diff --git a/src/rospkg/rospack.py b/src/rospkg/rospack.py new file mode 100644 index 0000000..3520321 --- /dev/null +++ b/src/rospkg/rospack.py @@ -0,0 +1,531 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +from threading import Lock + +try: + from xml.etree.cElementTree import ElementTree +except ImportError: + from xml.etree.ElementTree import ElementTree + +from .common import MANIFEST_FILE, PACKAGE_FILE, ResourceNotFound, STACK_FILE +from .environment import get_ros_paths +from .manifest import InvalidManifest, parse_manifest_file +from .stack import InvalidStack, parse_stack_file + +_cache_lock = Lock() + + +def list_by_path(manifest_name, path, cache): + """ + List ROS stacks or packages within the specified path. + + The cache will be updated with the resource->path + mappings. list_by_path() does NOT returned cached results + -- it only updates the cache. + + :param manifest_name: MANIFEST_FILE or STACK_FILE, ``str`` + :param path: path to list resources in, ``str`` + :param cache: path cache to update. Maps resource name to directory path, ``{str: str}`` + :returns: complete list of resources in ROS environment, ``[str]`` + """ + resources = [] + path = os.path.abspath(path) + basename = os.path.basename + for d, dirs, files in os.walk(path, topdown=True, followlinks=True): + if 'CATKIN_IGNORE' in files: + del dirs[:] + continue # leaf + if PACKAGE_FILE in files: + # parse package.xml and decide if it matches the search criteria + root = ElementTree(None, os.path.join(d, PACKAGE_FILE)) + is_metapackage = root.find('./export/metapackage') is not None + if ( + (manifest_name == STACK_FILE and is_metapackage) or + (manifest_name == MANIFEST_FILE and not is_metapackage) or + manifest_name == PACKAGE_FILE + ): + resource_name = root.findtext('name').strip(' \n\r\t') + if resource_name not in resources: + resources.append(resource_name) + if cache is not None: + cache[resource_name] = d + del dirs[:] + continue # leaf + if manifest_name in files: + resource_name = basename(d) + if resource_name not in resources: + resources.append(resource_name) + if cache is not None: + cache[resource_name] = d + del dirs[:] + continue # leaf + elif MANIFEST_FILE in files or PACKAGE_FILE in files: + # noop if manifest_name==MANIFEST_FILE, but a good + # optimization for stacks. + del dirs[:] + continue # leaf + elif 'rospack_nosubdirs' in files: + del dirs[:] + continue # leaf + # remove hidden dirs (esp. .svn/.git) + [dirs.remove(di) for di in dirs if di[0] == '.'] + return resources + + +class ManifestManager(object): + """ + Base class implementation for :class:`RosPack` and + :class:`RosStack`. This class indexes resources on paths with + where manifests denote the precense of the resource. NOTE: for + performance reasons, instances cache information and will not + reflect changes made on disk or to environment configuration. + """ + + def __init__(self, manifest_name, ros_paths=None): + """ + ctor. subclasses are expected to use *manifest_name* + to customize behavior of ManifestManager. + + :param manifest_name: MANIFEST_FILE or STACK_FILE + :param ros_paths: Ordered list of paths to search for + resources. If `None` (default), use environment ROS path. + """ + self._manifest_name = manifest_name + + if ros_paths is None: + self._ros_paths = get_ros_paths() + else: + self._ros_paths = ros_paths + + self._manifests = {} + self._depends_cache = {} + self._rosdeps_cache = {} + self._location_cache = None + self._custom_cache = {} + + @classmethod + def get_instance(cls, ros_paths=None): + """ + Reuse an existing instance for the specified ros_paths instead of creating a new one. + Only works for subclasses, as the ManifestManager itself expects two args for the ctor. + + :param ros_paths: Ordered list of paths to search for + resources. If `None` (default), use environment ROS path. + """ + if not hasattr(cls, '_instances'): + # add class variable _instances to cls + cls._instances = {} + + # generate instance_key from ros_paths variable + if ros_paths is None: + ros_paths = get_ros_paths() + instance_key = str(tuple(ros_paths)) + + if instance_key not in cls._instances: + # create and cache new instance + cls._instances[instance_key] = cls(ros_paths) + return cls._instances[instance_key] + + def get_ros_paths(self): + return self._ros_paths[:] + ros_paths = property(get_ros_paths, doc="Get ROS paths of this instance") + + def get_manifest(self, name): + """ + :raises: :exc:`InvalidManifest` + """ + if name in self._manifests: + return self._manifests[name] + else: + return self._load_manifest(name) + + def _update_location_cache(self): + global _cache_lock + # ensure self._location_cache is not checked while it is being updated + # (i.e. while it is not None, but also not completely populated) + with _cache_lock: + if self._location_cache is not None: + return + # initialize cache + cache = self._location_cache = {} + # nothing to search, #3680 + if not self._ros_paths: + return + # crawl paths using our own logic, in reverse order to get + # correct precedence + for path in reversed(self._ros_paths): + list_by_path(self._manifest_name, path, cache) + + def list(self): + """ + List resources. + + :returns: complete list of package names in ROS environment, ``[str]`` + """ + self._update_location_cache() + return self._location_cache.keys() + + def get_path(self, name): + """ + :param name: package name, ``str`` + :returns: filesystem path of package + :raises: :exc:`ResourceNotFound` + """ + self._update_location_cache() + if name not in self._location_cache: + raise ResourceNotFound(name, ros_paths=self._ros_paths) + else: + return self._location_cache[name] + + def _load_manifest(self, name): + """ + :raises: :exc:`ResourceNotFound` + """ + retval = self._manifests[name] = parse_manifest_file(self.get_path(name), self._manifest_name, rospack=self) + return retval + + def get_depends(self, name, implicit=True): + """ + Get dependencies of a resource. If implicit is ``True``, this + includes implicit (recursive) dependencies. + + :param name: resource name, ``str`` + :param implicit: include implicit (recursive) dependencies, ``bool`` + + :returns: list of names of dependencies, ``[str]`` + :raises: :exc:`InvalidManifest` If resource or any of its + dependencies have an invalid manifest. + """ + if not implicit: + m = self.get_manifest(name) + return [d.name for d in m.depends] + else: + if name in self._depends_cache: + return self._depends_cache[name] + + # take the union of all dependencies + names = [p.name for p in self.get_manifest(name).depends] + + # assign key before recursive call to prevent infinite case + self._depends_cache[name] = s = set() + + for p in names: + s.update(self.get_depends(p, implicit)) + # add in our own deps + s.update(names) + # cache the return value as a list + s = list(s) + self._depends_cache[name] = s + return s + + def get_depends_on(self, name, implicit=True): + """ + Get resources that depend on a resource. If implicit is ``True``, this + includes implicit (recursive) dependency relationships. + + NOTE: this does *not* raise :exc:`rospkg.InvalidManifest` if + there are invalid manifests found. + + :param name: resource name, ``str`` + :param implicit: include implicit (recursive) dependencies, ``bool`` + + :returns: list of names of dependencies, ``[str]`` + """ + depends_on = [] + if not implicit: + # have to examine all dependencies + for r in self.list(): + if r == name: + continue + try: + m = self.get_manifest(r) + if any(d for d in m.depends if d.name == name): + depends_on.append(r) + except InvalidManifest: + # robust to bad packages + pass + except ResourceNotFound: + # robust to bad packages + pass + else: + # Computing implicit dependencies requires examining the + # dependencies of all packages. As we already implement + # this logic in get_depends(), we simply reuse it here for + # the reverse calculation. This enables us to use the + # same dependency cache that get_depends() uses. The + # efficiency is roughly the same due to the caching. + for r in self.list(): + if r == name: + continue + try: + depends = self.get_depends(r, implicit=True) + if name in depends: + depends_on.append(r) + except InvalidManifest: + # robust to bad packages + pass + except ResourceNotFound: + # robust to bad packages + pass + return depends_on + + def get_custom_cache(self, key, default=None): + return self._custom_cache.get(key, default) + + def set_custom_cache(self, key, value): + self._custom_cache[key] = value + + +class RosPack(ManifestManager): + """ + Utility class for querying properties about ROS packages. This + should be used when querying properties about multiple + packages. + + NOTE 1: for performance reasons, RosPack caches information about + packages. + + NOTE 2: RosPack is not thread-safe. + + Example:: + from rospkg import RosPack + rp = RosPack() + packages = rp.list() + path = rp.get_path('rospy') + depends = rp.get_depends('roscpp') + direct_depends = rp.get_depends('roscpp', implicit=False) + """ + + def __init__(self, ros_paths=None): + """ + :param ros_paths: Ordered list of paths to search for + resources. If `None` (default), use environment ROS path. + """ + super(RosPack, self).__init__(MANIFEST_FILE, + ros_paths) + self._rosdeps_cache = {} + + def get_rosdeps(self, package, implicit=True): + """ + Collect rosdeps of specified package into a dictionary. + + :param package: package name, ``str`` + :param implicit: include implicit (recursive) rosdeps, ``bool`` + + :returns: list of rosdep names, ``[str]`` + """ + if implicit: + return self._implicit_rosdeps(package) + else: + m = self.get_manifest(package) + return [d.name for d in m.rosdeps] + + def _implicit_rosdeps(self, package): + """ + Compute recursive rosdeps of a single package and cache the + result in self._rosdeps_cache. + + :param package: package name, ``str`` + :returns: list of rosdeps, ``[str]`` + """ + if package in self._rosdeps_cache: + return self._rosdeps_cache[package] + + # set the key before recursive call to prevent infinite case + self._rosdeps_cache[package] = s = set() + + # take the union of all dependencies + packages = self.get_depends(package, implicit=True) + for p in packages: + s.update(self.get_rosdeps(p, implicit=False)) + # add in our own deps + m = self.get_manifest(package) + s.update([d.name for d in m.rosdeps]) + # cache the return value as a list + s = list(s) + self._rosdeps_cache[package] = s + return s + + def stack_of(self, package): + """ + :param package: package name, ``str`` + :returns: name of stack that package is in, or None if package is not part of a stack, ``str`` + :raises: :exc:`ResourceNotFound` If package cannot be located + """ + d = self.get_path(package) + while d and os.path.dirname(d) != d: + stack_file = os.path.join(d, STACK_FILE) + if os.path.exists(stack_file): + return os.path.basename(d) + else: + d = os.path.dirname(d) + + +class RosStack(ManifestManager): + """ + Utility class for querying properties about ROS stacks. This + should be used when querying properties about multiple + stacks. + + NOTE 1: for performance reasons, RosStack caches information about + stacks. + + NOTE 2: RosStack is not thread-safe. + """ + + def __init__(self, ros_paths=None): + """ + :param ros_paths: Ordered list of paths to search for + resources. If `None` (default), use environment ROS path. + """ + super(RosStack, self).__init__(STACK_FILE, ros_paths) + + def packages_of(self, stack): + """ + :returns: name of packages that are part of stack, ``[str]`` + :raises: :exc:`ResourceNotFound` If stack cannot be located + """ + return list_by_path(MANIFEST_FILE, self.get_path(stack), {}) + + def get_stack_version(self, stack): + """ + :param env: override environment variables, ``{str: str}`` + :returns: version number of stack, or None if stack is unversioned, ``str`` + """ + return get_stack_version_by_dir(self.get_path(stack)) + + +# #2022 +def expand_to_packages(names, rospack, rosstack): + """ + Expand names into a list of packages. Names can either be of packages or stacks. + + :param names: names of stacks or packages, ``[str]`` + :returns: ([packages], [not_found]). Returns two lists. The first + is of packages names. The second is a list of names for which no + matching stack or package was found. Lists may have + duplicates. ``([str], [str])`` + """ + if type(names) not in (tuple, list): + raise ValueError("names must be a list of strings") + + # do full package list first. This forces an entire tree + # crawl. This is less efficient for a small list of names, but + # much more efficient for many names. + package_list = rospack.list() + valid = [] + invalid = [] + for n in names: + if n not in package_list: + try: + valid.extend(rosstack.packages_of(n)) + except ResourceNotFound: + invalid.append(n) + else: + valid.append(n) + return valid, invalid + + +def get_stack_version_by_dir(stack_dir): + """ + Get stack version where stack_dir points to root directory of stack. + + :param env: override environment variables, ``{str: str}`` + + :returns: version number of stack, or None if stack is unversioned, ``str`` + :raises: :exc:`IOError` + :raises: :exc:`InvalidStack` + """ + catkin_stack_filename = os.path.join(stack_dir, 'stack.xml') + if os.path.isfile(catkin_stack_filename): + try: + stack = parse_stack_file(catkin_stack_filename) + return stack.version + except InvalidStack: + pass + + cmake_filename = os.path.join(stack_dir, 'CMakeLists.txt') + if os.path.isfile(cmake_filename): + with open(cmake_filename) as f: + try: + return _get_cmake_version(f.read()) + except ValueError: + return None + else: + return None + + +def _get_cmake_version(text): + """ + :raises :exc:`ValueError` If version number in CMakeLists.txt cannot be parsed correctly + """ + import re + for l in text.split('\n'): + if l.strip().startswith('rosbuild_make_distribution'): + x_re = re.compile(r'[()]') + lsplit = x_re.split(l.strip()) + if len(lsplit) < 2: + raise ValueError("couldn't find version number in CMakeLists.txt:\n\n%s" % l) + version = lsplit[1] + if version: + return version + else: + raise ValueError("cannot parse version number in CMakeLists.txt:\n\n%s" % l) + + +def get_package_name(path): + """ + Get the name of the ROS package that contains *path*. This is + determined by finding the nearest parent ``manifest.xml`` file. + This routine may not traverse package setups that rely on internal + symlinks within the package itself. + + :param path: filesystem path + :return: Package name or ``None`` if package cannot be found, ``str`` + """ + # NOTE: the realpath is going to create issues with symlinks, most + # likely. + parent = os.path.dirname(os.path.realpath(path)) + # walk up until we hit ros root or ros/pkg + while not os.path.exists(os.path.join(path, MANIFEST_FILE)) and not os.path.exists(os.path.join(path, PACKAGE_FILE)) and parent != path: + path = parent + parent = os.path.dirname(path) + # check termination condition + if os.path.exists(os.path.join(path, MANIFEST_FILE)): + return os.path.basename(os.path.abspath(path)) + elif os.path.exists(os.path.join(path, PACKAGE_FILE)): + root = ElementTree(None, os.path.join(path, PACKAGE_FILE)) + return root.findtext('name') + else: + return None diff --git a/src/rospkg/rosversion.py b/src/rospkg/rosversion.py new file mode 100644 index 0000000..e8ef820 --- /dev/null +++ b/src/rospkg/rosversion.py @@ -0,0 +1,124 @@ +# Copyright (c) 2008, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of the Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function + +import argparse +import os +import sys +import traceback + +from .common import PACKAGE_FILE +from .rospack import ManifestManager, RosPack, RosStack, ResourceNotFound + + +# for < fuerte, retrieve from roscore file +def get_distro_name_from_roscore(): + ''' + This function only works for ROS Electric and older. + For any newer ROS distro the information is provided + in the ROS_DISTRO environment variable. + ''' + rospack = RosPack() + # there's some chance that the location of this file changes in the future + try: + roslaunch_dir = rospack.get_path('roslaunch') + roscore_file = os.path.join(roslaunch_dir, 'roscore.xml') + if not os.path.exists(roscore_file): + return None + except: + return None + + import xml.dom.minidom + try: + dom = xml.dom.minidom.parse(roscore_file) + tags = dom.getElementsByTagName("param") + for t in tags: + if t.hasAttribute('name') and t.getAttribute('name') == 'rosdistro': + return t.getAttribute('value') + except: + traceback.print_exc() + + +def print_without_newline(argtext): + """Print with no new line.""" + print(argtext, end='') + + +def main(): + parser = argparse.ArgumentParser( + description='rosversion -d: Output the version of the given package\n' + 'rosversion package: Output the ROS distribution name', + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument( + '-s', '--skip-newline', action='store_true', + help='Skip trailing newline') + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + 'package', nargs='?', + help="The ROS package name (e.g. 'roscpp')") + group.add_argument( + '-d', '--distro', action='store_true', + help='Output the ROS distribution name') + + args = parser.parse_args() + + printer = print_without_newline if args.skip_newline else print + + if args.distro: + if 'ROS_DISTRO' in os.environ: + distro_name = os.environ['ROS_DISTRO'] + else: + distro_name = get_distro_name_from_roscore() + if not distro_name: + distro_name = '' + printer(distro_name) + sys.exit(0) + + rosstack = RosStack() + try: + version = rosstack.get_stack_version(args.package) + except ResourceNotFound as e: + try: + # hack to make it work with wet packages + mm = ManifestManager(PACKAGE_FILE) + path = mm.get_path(args.package) + package_manifest = os.path.join(path, 'package.xml') + if os.path.exists(package_manifest): + from xml.etree.ElementTree import ElementTree + try: + root = ElementTree(None, package_manifest) + version = root.findtext('version') + except Exception: + pass + except ResourceNotFound as e: + print('Cannot locate [%s]' % args.package) + sys.exit(1) + + if version is None: + version = '' + printer(version) diff --git a/src/rospkg/stack.py b/src/rospkg/stack.py new file mode 100644 index 0000000..32254fa --- /dev/null +++ b/src/rospkg/stack.py @@ -0,0 +1,258 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2012, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +""" +Library for processing stack.xml created post-catkin +""" + +import collections +import os +import xml.dom.minidom as dom + +# as defined on http://ros.org/doc/fuerte/api/catkin/html/stack_xml.html +REQUIRED = ['name', 'version', 'description', 'author', 'maintainer', 'license', 'copyright'] +ALLOWXHTML = ['description'] +OPTIONAL = ['description_brief', 'version_abi', 'url', 'review_notes', 'review_status', 'build_depends', 'depends', 'build_type', 'message_generator', 'review'] + +LISTED_ATTRIBUTES = {'Author': ['name', 'email'], 'Maintainer': ['name', 'email'], 'Depend': ['name', 'version']} + +VALID = REQUIRED + OPTIONAL + + +class InvalidStack(Exception): + pass + + +def _get_nodes_by_name(n, name): + return [t for t in n.childNodes if t.nodeType == t.ELEMENT_NODE and t.tagName == name] + + +def _check_optional(name, allowXHTML=False): + """ + Validator for optional elements. + + :raise: :exc:`InvalidStack` If validation fails + """ + def check(n, filename): + n = _get_nodes_by_name(n, name) + if len(n) > 1: + raise InvalidStack("Invalid stack.xml file [%s]: must have at most one '%s' element" % (filename, name)) + if n: + if allowXHTML: + return ''.join([x.toxml() for x in n[0].childNodes]) + return _get_text(n[0].childNodes).strip() + return check + + +def _check_required(name, allowXHTML=False): + """ + Validator for required elements. + + :raise: :exc:`InvalidStack` If validation fails + """ + def check(n, filename): + n = _get_nodes_by_name(n, name) + if len(n) != 1: + raise InvalidStack("Invalid stack.xml file [%s]: must have exactly one '%s' element" % (filename, name)) + if allowXHTML: + return ''.join([x.toxml() for x in n[0].childNodes]) + return _get_text(n[0].childNodes).strip() + return check + + +def _check_depends(n, key, filename): + """ + Validator for stack.xml depends. + :raise: :exc:`InvalidStack` If validation fails + """ + nodes = _get_nodes_by_name(n, key) + return set([_get_text(n.childNodes).strip() for n in nodes]) + + +def _build_listed_attributes(n, key, object_type): + """ + Validator for stack.xml depends. + :raise: :exc:`InvalidStack` If validation fails + """ + members = set() + for node in _get_nodes_by_name(n, key): + # The first field is always supposed to be the value + attribute_dict = {} + for field in object_type._fields: + try: + attribute_dict[field] = node.getAttribute(field) + except: + pass + attribute_dict[object_type._fields[0]] = _get_text(node.childNodes).strip() + members.add(object_type(**attribute_dict)) + return members + + +def _attrs(node): + attrs = {} + for k in node.attributes.keys(): + attrs[k] = node.attributes.get(k).value + return attrs + + +def _check(name): + """ + Generic validator for text-based tags. + """ + if name in REQUIRED: + return _check_required(name, name in ALLOWXHTML) + elif name in OPTIONAL: + return _check_optional(name, name in ALLOWXHTML) + + +class Stack(object): + """ + Object representation of a ROS ``stack.xml`` file + """ + __slots__ = [ + 'name', 'version', 'description', 'authors', 'maintainers', 'license', 'copyright', + 'description_brief', 'version_abi', 'url', 'review_notes', 'review_status', + 'build_depends', 'depends', 'build_type', 'build_type_file', 'message_generator', + 'unknown_tags'] + + def __init__(self, filename=None): + """ + :param filename: location of stack.xml. Necessary if + converting ``${prefix}`` in ```` values, ``str``. + """ + self.description = self.description_brief = self.name = \ + self.version = self.version_abi = \ + self.license = self.copyright = '' + self.url = '' + self.authors = [] + self.maintainers = [] + self.depends = [] + self.build_depends = [] + self.review_notes = self.review_status = '' + self.build_type = 'cmake' + self.build_type_file = '' + self.message_generator = '' + + # store unrecognized tags during parsing + self.unknown_tags = [] + + +def _get_text(nodes): + """ + DOM utility routine for getting contents of text nodes + """ + return "".join([n.data for n in nodes if n.nodeType == n.TEXT_NODE]) + + +def parse_stack_file(stack_path): + """ + Parse stack file. + + :param stack_path: The path of the stack.xml file + + :returns: return :class:`Stack` instance, populated with parsed fields + :raises: :exc:`InvalidStack` + :raises: :exc:`IOError` + """ + if not os.path.isfile(stack_path): + raise IOError("Invalid/non-existent stack.xml file: %s" % (stack_path)) + + with open(stack_path, 'r') as f: + return parse_stack(f.read(), stack_path) + + +def parse_stack(string, filename): + """ + Parse stack.xml string contents. + + :param string: stack.xml contents, ``str`` + :param filename: full file path for debugging, ``str`` + :returns: return parsed :class:`Stack` + """ + # Create some classes to hold some members + new_tuples = {} + for key, members in LISTED_ATTRIBUTES.items(): + new_tuples[key] = collections.namedtuple(key, members) + + try: + d = dom.parseString(string) + except Exception as e: + raise InvalidStack("[%s] invalid XML: %s" % (filename, e)) + + s = Stack() + p = _get_nodes_by_name(d, 'stack') + if len(p) != 1: + raise InvalidStack("stack.xml [%s] must have a single 'stack' element" % (filename)) + p = p[0] + for attr in [ + 'name', 'version', 'description', + 'license', 'copyright', 'url', 'build_type', 'message_generator' + ]: + val = _check(attr)(p, filename) + if val: + setattr(s, attr, val) + + try: + tag = _get_nodes_by_name(p, 'description')[0] + s.description_brief = tag.getAttribute('brief') or '' + except: + # means that 'description' tag is missing + pass + + s.authors = _build_listed_attributes(p, 'author', new_tuples['Author']) + s.maintainers = _build_listed_attributes(p, 'maintainer', new_tuples['Maintainer']) + s.depends = _build_listed_attributes(p, 'depends', new_tuples['Depend']) + s.build_depends = _build_listed_attributes(p, 'build_depends', new_tuples['Depend']) + + try: + tag = _get_nodes_by_name(p, 'review')[0] + s.review_status = tag.getAttribute('status') or '' + except: + pass # stack.xml is missing optional 'review status' tag + + try: + tag = _get_nodes_by_name(p, 'review')[0] + s.review_notes = tag.getAttribute('notes') or '' + except: + pass # stack.xml is missing optional 'review notes' tag + + try: + tag = _get_nodes_by_name(p, 'build_type')[0] + s.build_type_file = tag.getAttribute('file') or '' + except: + pass # stack.xml is missing optional 'build_type file' tag + + # store unrecognized tags + s.unknown_tags = [e.nodeName for e in p.childNodes if e.nodeType == e.ELEMENT_NODE and e.tagName not in VALID] + if s.unknown_tags: + raise InvalidStack("stack.xml [%s] must be cleaned up from %s" % (filename, str(s.unknown_tags))) + return s diff --git a/test/test_rospkg_catkin_packages.py b/test/test_rospkg_catkin_packages.py new file mode 100644 index 0000000..87ac279 --- /dev/null +++ b/test/test_rospkg_catkin_packages.py @@ -0,0 +1,69 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function + +import os + +import rospkg + +search_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'catkin_package_tests')) + + +def test_find_packages(): + manager = rospkg.rospack.ManifestManager(rospkg.common.MANIFEST_FILE, ros_paths=[search_path]) + # for backward compatibility a wet package which is not a metapackage is found when searching for MANIFEST_FILE + assert(len(manager.list()) == 1) + manager = rospkg.rospack.ManifestManager(rospkg.common.STACK_FILE, ros_paths=[search_path]) + assert(len(manager.list()) == 0) + manager = rospkg.rospack.ManifestManager(rospkg.common.PACKAGE_FILE, ros_paths=[search_path]) + + for pkg_name in manager.list(): + assert(pkg_name == 'foo') + path = manager.get_path(pkg_name) + assert(path == os.path.join(search_path, 'p1', 'foo')) + + +def test_get_manifest(): + manager = rospkg.rospack.ManifestManager(rospkg.common.MANIFEST_FILE, ros_paths=[search_path]) + manif = manager.get_manifest("foo") + assert(manif.type == "package") + + +def test_licenses(): + rospack = rospkg.rospack.RosPack(ros_paths=[search_path]) + licenses_list = ["BSD", "LGPL"] + manif = rospack.get_manifest("foo") + assert(manif.license == ", ".join(licenses_list)) + assert(len(manif.licenses) == 2) + for l in manif.licenses: + assert(l in licenses_list) diff --git a/test/test_rospkg_common.py b/test/test_rospkg_common.py new file mode 100644 index 0000000..16be26e --- /dev/null +++ b/test/test_rospkg_common.py @@ -0,0 +1,52 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function + + +def test_ResourceNotFound(): + from rospkg import ResourceNotFound + r = ResourceNotFound('blah') + assert 'blah' == str(r) + assert r.ros_paths is None + s = str(r) + assert 'None' not in s + assert 'blah' in s + + r = ResourceNotFound('blah', ['ros_root', 'ros_package_path']) + assert 'blah' == str(r.args[0]) + assert ['ros_root', 'ros_package_path'] == r.ros_paths + s = str(r) + print(s) + assert 'blah' in s + assert 'ros_root' in s + assert 'ros_package_path' in s diff --git a/test/test_rospkg_distro.py b/test/test_rospkg_distro.py new file mode 100644 index 0000000..7d326e5 --- /dev/null +++ b/test/test_rospkg_distro.py @@ -0,0 +1,419 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2009, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os + +import yaml + + +def get_test_path(): + return os.path.abspath(os.path.join(os.path.dirname(__file__), 'rosdistro')) + + +def get_etc_path(): + return os.path.abspath(os.path.join(os.path.dirname(__file__), 'fakeetcros')) + + +def test_distro_uri(): + from rospkg.distro import distro_uri + assert distro_uri('groovy') == "http://svn.code.sf.net/p/ros-dry-releases/code/trunk/distros/groovy.rosdistro" + + +def test_current_distro_codename(): + import rospkg.environment + from rospkg.distro import current_distro_codename + assert 'awesome' == current_distro_codename(env={'ROS_DISTRO': 'awesome'}) + env = {rospkg.environment.ROS_ETC_DIR: get_etc_path()} + val = current_distro_codename(env=env) + assert 'rosawesome' == current_distro_codename(env=env), val + + +def test__current_distro_electric(): + from rospkg.distro import _current_distro_electric + # tripwire, not allowed to throw + _current_distro_electric() + + +def test__current_distro_electric_parse_roscore(): + from rospkg.distro import _current_distro_electric_parse_roscore + roscore_file = os.path.join(get_test_path(), 'roscore-electric.xml') + assert os.path.exists(roscore_file), roscore_file + val = _current_distro_electric_parse_roscore(roscore_file) + assert 'electric' == val, val + + bad_roscore_file = os.path.join(get_test_path(), 'roscore-bad.xml') + assert _current_distro_electric_parse_roscore(bad_roscore_file) is None + + no_roscore_file = os.path.join(get_test_path(), 'non-existent.xml') + assert _current_distro_electric_parse_roscore(no_roscore_file) is None + + +def xtest_Distro_dback(self): + # TODO: better unit tests. For now this is mostly a tripwire + from rospkg.distro import DistroStack + distros = load_Distros_dback() + dback = distros['diamondback'] + r = 'diamondback' + v = 'r8596' + + self.assertEquals(r, dback.release_name) + self.assertEquals(v, dback.version) + + # make sure ros got assigned and is correct + ros = DistroStack('ros', dback_ros_rules, dback_versions['ros'], r, v) + self.assertEquals(ros, dback.ros) + self.assertEquals(ros, dback.stacks['ros']) + + # make sure the variants are configured + self.assert_('base' not in dback.variants) + + ros_base = dback.variants['ros-base'] + self.assertEquals([], ros_base.extends) + ros_base_stacks = ['ros', 'ros_comm'] + self.assertEquals(ros_base_stacks, ros_base.stack_names) + + robot = dback.variants['robot'] # extends ros-base + self.assertEquals(set(['ros-base']), set(robot.extends)) + robot_stacks = ['common_msgs', 'common', 'diagnostics', 'driver_common', 'geometry', 'robot_model', 'executive_smach'] + self.assertEquals(set(ros_base_stacks + robot_stacks), set(robot.stack_names)) + self.assertEquals(set(robot_stacks), set(robot.stack_names_explicit)) + + mobile = dback.variants['mobile'] # extends robot + mobile_stacks = ['navigation', 'slam_gmapping'] + self.assertEquals(set(ros_base_stacks + robot_stacks + mobile_stacks), set(mobile.stack_names)) + self.assertEquals(set(mobile_stacks), set(mobile.stack_names_explicit)) + + viz = dback.variants['viz'] + self.assertEquals([], viz.extends) + viz_stacks = ['visualization_common', 'visualization'] + self.assertEquals(set(viz_stacks), set(viz.stack_names)) + self.assertEquals(set(viz_stacks), set(viz.stack_names_explicit)) + + desktop = dback.variants['desktop'] # robot, rviz + self.assertEquals(set(['robot', 'viz']), set(desktop.extends)) + desktop_stacks = ['ros_tutorials', 'common_tutorials', 'geometry_tutorials', 'visualization_tutorials'] + self.assertEquals(set(ros_base_stacks + robot_stacks + viz_stacks + desktop_stacks), set(desktop.stack_names)) + self.assertEquals(set(desktop_stacks), set(desktop.stack_names_explicit)) + + simulator_stacks = ['simulator_stage', 'simulator_gazebo', 'physics_ode'] + perception_stacks = ['image_common', 'image_transport_plugins', 'image_pipeline', 'laser_pipeline', 'perception_pcl', 'vision_opencv'] + desktop_full = dback.variants['desktop-full'] + self.assertEquals(set(['desktop', 'mobile', 'perception', 'simulators']), set(desktop_full.extends)) + self.assertEquals(set(ros_base_stacks + robot_stacks + mobile_stacks + perception_stacks + simulator_stacks + desktop_stacks + viz_stacks), set(desktop_full.stack_names)) + self.assertEquals([], desktop_full.stack_names_explicit) + + # make sure we loaded the stacks + stack_names = ['common', 'common_msgs', 'navigation'] + for s in stack_names: + val = DistroStack(s, dback_rospkg_rules, dback_versions[s], r, v) + self.assertEquals(val, dback.stacks[s]) + + # test an hg rule + dback_geometry_rules = {'hg': + {'dev-branch': 'tf_rework', + 'distro-tag': '$RELEASE_NAME', + 'release-tag': '$STACK_NAME-$STACK_VERSION', + 'uri': 'https://ros-geometry.googlecode.com/hg/'}, + 'repo': 'ros-pkg', + } + s = 'geometry' + val = DistroStack(s, dback_geometry_rules, dback_versions[s], r, v) + self.assertEquals(val, dback.stacks[s]) + + +def test_expand_rule(): + from rospkg.distro import expand_rule + assert 'foo' == expand_rule('$STACK_NAME', 'foo', 'version', 'release') + assert 'version' == expand_rule('$STACK_VERSION', 'foo', 'version', 'release') + assert 'release' == expand_rule('$RELEASE_NAME', 'foo', 'version', 'release') + assert 'foo-version-release' == expand_rule('$STACK_NAME-$STACK_VERSION-$RELEASE_NAME', 'foo', 'version', 'release') + + +default_rules = {} +default_rules['git'] = {'git': {'anon-uri': 'https://github.com/ipa320/$STACK_NAME.git', + 'dev-branch': 'release_electric', + 'distro-tag': '$RELEASE_NAME', + 'release-tag': '$STACK_NAME-$STACK_VERSION', + 'uri': 'git@github.com:ipa320/$STACK_NAME.git'}} +rule = default_rules['git'] + + +def test_DistroStack(): + from rospkg.distro import DistroStack + s = DistroStack('stack', 'version', 'electric', rule) + assert 'stack' == s.name + assert 'version' == s.version + assert rule == s._rules + assert 'git' == s.vcs_config.type + assert s.vcs_config.get_branch('devel', False) == ('git@github.com:ipa320/stack.git', 'release_electric') + assert s.vcs_config.get_branch('devel', True) == ('https://github.com/ipa320/stack.git', 'release_electric') + assert s.vcs_config.get_branch('distro', False) == ('git@github.com:ipa320/stack.git', 'electric'), s.vcs_config.get_branch('release', False) + assert s.vcs_config.get_branch('distro', True) == ('https://github.com/ipa320/stack.git', 'electric') + assert s.vcs_config.get_branch('release', False) == ('git@github.com:ipa320/stack.git', 'stack-version'), s.vcs_config.get_branch('release', False) + assert s.vcs_config.get_branch('release', True) == ('https://github.com/ipa320/stack.git', 'stack-version') + + assert s == s + assert s == DistroStack('stack', 'version', 'electric', rule) + assert s != 'stack' + assert s != DistroStack('stack2', 'version', 'electric', rule) + assert s != DistroStack('stack', 'version2', 'electric', rule) + assert s != DistroStack('stack', 'version', 'dback', rule) + rule2 = rule.copy() + rule2['git']['uri'] == 'foo' + assert s != DistroStack('stack', 'version', 'dback', rule2) + + +def test_Variant(): + from rospkg.distro import Variant + v = Variant("foo", [], [], []) + assert 'foo' == v.name + assert [] == v.extends + assert [] == v.get_stack_names(True) + assert [] == v.get_stack_names(False) + + raw_data = {'extends': ['robot', 'viz'], + 'stacks': ['arm_navigation', 'octomap_mapping', 'physics_ode', 'perception_pcl', 'pr2_controllers', + 'control', 'pr2_mechanism', 'pr2_common']} + stack_names_implicit = raw_data['stacks'] + ['a', 'b', 'c', 'd'] + v = Variant('bar', raw_data['extends'], raw_data['stacks'], stack_names_implicit) + assert set(v.extends) == set(['robot', 'viz']), v.extends + assert set(v.get_stack_names(True)) == set(['arm_navigation', 'octomap_mapping', 'physics_ode', 'perception_pcl', 'pr2_controllers', + 'control', 'pr2_mechanism', 'pr2_common', 'a', 'b', 'c', 'd']) + assert set(v.get_stack_names(False)) == set(['arm_navigation', 'octomap_mapping', 'physics_ode', 'perception_pcl', 'pr2_controllers', + 'control', 'pr2_mechanism', 'pr2_common']) + + +def test_Distro(): + from rospkg.distro import Distro, Variant, DistroStack + + raw_data = {'extends': ['robot', 'viz'], + 'stacks': ['arm_navigation', 'octomap_mapping', 'physics_ode', 'perception_pcl', 'pr2_controllers', + 'control', 'pr2_mechanism', 'pr2_common']} + stack_names_implicit = raw_data['stacks'] + ['a', 'b', 'c', 'd'] + v = Variant('bar', raw_data['extends'], raw_data['stacks'], stack_names_implicit) + s = DistroStack('stack', 'version', 'electric', rule) + s_unreleased = DistroStack('unreleased', None, 'electric', rule) + + variants = {'bar': v} + stacks = {'stack': s, 'unreleased': s_unreleased} + d = Distro(stacks, variants, 'electric', '1', {}) + assert d._stacks == stacks + assert d.variants == variants + assert d.release_name == 'electric' + assert d.version == '1' + assert {} == d.raw_data + assert stacks == d.get_stacks(released=False) + assert {'stack': s} == d.get_stacks(released=True) + assert stacks == d.stacks + assert {'stack': s} == d.released_stacks + + +dback_ros_rules = {'svn': {'dev': 'https://code.ros.org/svn/ros/stacks/$STACK_NAME/trunk', + 'distro-tag': 'https://code.ros.org/svn/ros/stacks/$STACK_NAME/tags/$RELEASE_NAME', + 'release-tag': 'https://code.ros.org/svn/ros/stacks/$STACK_NAME/tags/$STACK_NAME-$STACK_VERSION'}, + 'repo': 'ros'} +dback_rospkg_rules = {'svn': {'dev': 'https://code.ros.org/svn/ros-pkg/stacks/$STACK_NAME/trunk', + 'distro-tag': 'https://code.ros.org/svn/ros-pkg/stacks/$STACK_NAME/tags/$RELEASE_NAME', + 'release-tag': 'https://code.ros.org/svn/ros-pkg/stacks/$STACK_NAME/tags/$STACK_NAME-$STACK_VERSION'}, + 'repo': 'ros-pkg'} + +dback_versions = { + 'common': '1.3.3', + 'common_msgs': '1.3.5', + 'geometry': '1.3.1', + 'navigation': '1.3.1', + 'ros': '1.4.0', +} + + +def test_load_distro_bad_data(): + from rospkg import ResourceNotFound + from rospkg.distro import load_distro, InvalidDistro + try: + load_distro('bad') + assert False + except ResourceNotFound: + pass + for i in range(1, 10): + if i == 4: + # currently non-existent stacks in variants are not determinable + continue + filename = 'bad%s.rosdistro' % (i) + try: + d = get_test_path() + p = os.path.join(d, filename) + load_distro(p) + assert False, "should have raised: %s" % (filename) + except InvalidDistro: + pass + + +def test_load_distro_variants(): + # test with no and empty variants (issue found in fuerte bringup) + from rospkg.distro import load_distro + d = get_test_path() + for name in ['no_variants.rosdistro', 'empty_variants.rosdistro']: + p = os.path.join(d, name) + distro = load_distro(p) + assert distro.release_name == 'simple', distro.release_name + assert set(distro.variants.keys()) == set([]) + + +def test_distro_to_rosinstall(): + from rospkg.distro import distro_to_rosinstall, load_distro + d = get_test_path() + distro = load_distro(os.path.join(d, 'simple.rosdistro')) + data = distro_to_rosinstall(distro, 'devel', variant_name='base', implicit=False, released_only=True, anonymous=True) + # should only have a single stack + assert len(data) == 1 + url = 'https://simple.com/svn/trunk/stack1' + assert data[0] == dict(svn={'uri': url, 'local-name': 'stack1'}) + data = distro_to_rosinstall(distro, 'devel', variant_name=None, implicit=False, released_only=True, anonymous=True) + assert len(data) == 1 + assert data[0] == dict(svn={'uri': url, 'local-name': 'stack1'}) + data = distro_to_rosinstall(distro, 'devel', variant_name=None, implicit=False, released_only=False, anonymous=True) + assert len(data) == 1 + assert data[0] == dict(svn={'uri': url, 'local-name': 'stack1'}) + # TODO: need more complete tests with more complicated files + + +def test_load_distro_simple(): + from rospkg.distro import load_distro, Distro + d = get_test_path() + p = os.path.join(d, 'simple.rosdistro') + distro = load_distro(p) + assert isinstance(distro, Distro) + + assert distro.release_name == 'simple', distro.release_name + assert distro.version == '1', distro.version + assert yaml.load(open(p)) == distro.raw_data, distro.raw_data + assert set(distro.variants.keys()) == set(['base']) + assert set(distro.stacks.keys()) == set(['stack1']) + + stack1 = distro.stacks['stack1'] + assert stack1.vcs_config.get_branch('devel', False) == ('https://simple.com/svn/trunk/stack1', None) + assert stack1.vcs_config.get_branch('distro', False) == ('https://simple.com/svn/tags/distros/simple/stacks/stack1', None) + assert stack1.vcs_config.get_branch('release', False) == ('https://simple.com/svn/tags/stacks/stack1/stack1-0.3.0', None) + + +def test_load_distro_diamondback(): + from rospkg.distro import load_distro, Distro + d = get_test_path() + p = os.path.join(d, 'diamondback.rosdistro') + distro = load_distro(p) + assert isinstance(distro, Distro) + + assert distro.release_name == 'diamondback', distro.release_name + assert distro.version == 'r8596', distro.version + assert yaml.load(open(p)) == distro.raw_data, distro.raw_data + assert set(distro.variants.keys()) == set(diamondback_variants) + assert set(distro.stacks.keys()) == set(diamondback_stacks), set(distro.stacks.keys()) ^ set(diamondback_stacks) + + assert distro.variants['ros-base'].extends == [] + retval = distro.variants['ros-base'].get_stack_names(True) + assert retval == ['ros', 'ros_comm'], retval + assert distro.variants['ros-base'].get_stack_names(False) == ['ros', 'ros_comm'] + assert set(distro.variants['ros-full'].get_stack_names(True)) == set(['ros', 'ros_comm', 'rx', 'documentation']) + + assert distro.stacks['common'].version == '1.3.3' + assert distro.stacks['common'].vcs_config.get_branch('devel', True) == ('https://code.ros.org/svn/ros-pkg/stacks/common/trunk', None) + + +def test__load_variants(): + from rospkg.distro import _load_variants + raw_data = yaml.load("""variants: +- ros-base: + stacks: [ros, ros_comm] +- ros-full: + extends: ros-base + stacks: [rx, documentation] +- viz: + stacks: [visualization_common, visualization] +- robot: + extends: [ros-base] + stacks: [common_msgs, common, diagnostics] +- desktop: + extends: [robot, viz, ros-full] + stacks: [ros_tutorials, common_tutorials] +""") + raw_data = raw_data['variants'] + # mock data so variants validate + stacks = dict(ros=1, ros_comm=2, rx=3, documentation=4, visualization_common=5, + visualization=6, common_msgs=7, common=8, ros_tutorials=9, common_tutorials=10, diagnostics=11) + variants = _load_variants(raw_data, stacks) + assert set(variants.keys()) == set(['ros-base', 'ros-full', 'viz', 'robot', 'desktop']), variants.keys() + assert variants['ros-base'].extends == [] + assert variants['ros-full'].extends == ['ros-base'] + assert variants['desktop'].extends == ['robot', 'viz', 'ros-full'] + + assert set(variants['ros-base'].get_stack_names(True)) == set(['ros', 'ros_comm']) + assert set(variants['ros-base'].get_stack_names(False)) == set(['ros', 'ros_comm']) + + assert set(variants['ros-full'].get_stack_names(True)) == set(['rx', 'documentation', 'ros', 'ros_comm']) + assert set(variants['ros-full'].get_stack_names(False)) == set(['rx', 'documentation']) + + assert set(variants['desktop'].get_stack_names(True)) == set(stacks.keys()) + assert set(variants['desktop'].get_stack_names(False)) == set(['ros_tutorials', 'common_tutorials']) + + +diamondback_stacks = [ + 'pr2_web_apps', 'octomap_mapping', 'motion_planning_environment', 'robot_calibration', + 'sound_drivers', 'joystick_drivers', 'ros', + 'pano', 'knowrob', 'perception_pcl', 'image_pipeline', 'kinect', + 'bosch_skin', 'pr2_common_actions', 'pr2_arm_navigation_apps', 'ocr', 'articulation', + 'nxt_robots', 'visualization_common', 'physics_ode', 'arm_navigation', 'collision_environment', + 'executive_smach', 'ethzasl_aseba', 'cart_pushing', 'velodyne', 'pr2_arm_navigation_tests', + 'art_vehicle', 'common', 'motion_planning_visualization', 'geometry_tutorials', 'people', + 'pr2_power_drivers', 'joystick_drivers_tutorials', 'cob_common', 'vslam', 'pr2_arm_navigation', + 'ias_common', 'pr2_navigation_apps', 'geometry_experimental', 'rx', 'motion_planners', + 'pr2_gui', 'simulator_stage', 'linux_networking', 'pr2_calibration', 'image_common', + 'visualization', 'mpi', 'cob_extern', 'camera_drivers', 'laser_drivers', + 'orocos_toolchain_ros', 'driver_common', 'common_msgs', 'pr2_controllers', 'robot_model', + 'motion_planning_common', 'simulator_gazebo', 'cram_pl', 'multimaster_experimental', 'navigation', + 'pr2_robot', 'geometry', 'freiburg_tools', 'nxt_apps', 'wifi_drivers', + 'slam_gmapping', 'web_interface', 'vision_opencv', 'kinematics', 'pr2_simulator', + 'roshpit', 'pr2_cockpit', 'pr2_kinematics', 'sql_database', + 'navigation_experimental', 'pr2_object_manipulation', 'erratic_robot', 'object_manipulation', 'tabletop_object_perception', + 'pr2_tabletop_manipulation_apps', 'bosch_drivers', 'image_transport_plugins', 'perception_pcl_addons', 'slam_karto', + 'wg_hardware_test', 'ros_release', 'pr2_navigation', 'exploration', 'continuous_ops', + 'control', 'ros_tutorials', 'pr2_ethercat_drivers', 'ethzasl_message_transport', 'client_rosjava', + 'ros_realtime', 'pr2_mechanism', 'point_cloud_perception', 'wg_pr2_apps', 'graph_mapping', + 'cob_driver', 'cob_simulation', 'pr2_common', 'wg_robots_gazebo', 'pr2_common_alpha', + 'trajectory_filters', 'topological_navigation', 'imu_drivers', 'ros_applications', 'pr2_exploration', + 'common_tutorials', 'ros_comm', 'mapping', 'pr2_plugs', 'roslisp_common', + 'wg_common', 'roslisp_support', 'cob_apps', 'nxt', 'pr2_apps', 'visualization_tutorials', + 'laser_pipeline', 'pr2_kinematics_with_constraints', 'documentation', 'pr2_self_test', 'diagnostics', 'pr2_doors'] + +diamondback_variants = [ + 'ros-base', 'ros-full', 'viz', 'robot', 'simulators', 'mobile', 'perception', 'desktop', + 'desktop-full', 'move-arm', 'pr2-base', 'pr2', 'pr2-desktop', 'pr2-applications', + 'wg-pr2', 'care-o-bot', 'bosch', 'nxtall', 'alufr', 'utexas-art', 'tum'] diff --git a/test/test_rospkg_distro_vcs_config.py b/test/test_rospkg_distro_vcs_config.py new file mode 100644 index 0000000..3b105b6 --- /dev/null +++ b/test/test_rospkg_distro_vcs_config.py @@ -0,0 +1,383 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +default_rules = {} +rosinstalls = {} +default_rules['git'] = {'git': {'anon-uri': 'https://github.com/ipa320/$STACK_NAME.git', + 'dev-branch': 'release_electric', + 'distro-tag': '$RELEASE_NAME', + 'release-tag': '$STACK_NAME-$STACK_VERSION', + 'uri': 'git@github.com:ipa320/$STACK_NAME.git'}} +rosinstalls['git'] = {} +rosinstalls['git']['release-tar'] = [{'tar': {'local-name': 'local_name', + 'version': '$STACK_NAME-$STACK_VERSIONevaled', + 'uri': 'http://svn.code.sf.net/p/ros-dry-releases/code/download/stacks/$STACK_NAME/$STACK_NAME-$STACK_VERSION/$STACK_NAME-$STACK_VERSION.tar.bz2evaled'}}] +rosinstalls['git']['devel'] = [{'git': {'local-name': 'local_name', + 'version': 'release_electricevaled', + 'uri': 'https://github.com/ipa320/$STACK_NAME.gitevaled'}}] +rosinstalls['git']['release'] = [{'git': {'local-name': 'local_name', + 'version': '$STACK_NAME-$STACK_VERSIONevaled', + 'uri': 'https://github.com/ipa320/$STACK_NAME.gitevaled'}}] +rosinstalls['git']['distro'] = [{'git': {'local-name': 'local_name', + 'version': '$RELEASE_NAMEevaled', + 'uri': 'https://github.com/ipa320/$STACK_NAME.gitevaled'}}] +default_rules['svn'] = {'svn': {'dev': 'https://alufr-ros-pkg.googlecode.com/svn/trunk/$STACK_NAME', + 'distro-tag': 'https://alufr-ros-pkg.googlecode.com/svn/tags/distros/$RELEASE_NAME/stacks/$STACK_NAME', + 'release-tag': 'https://alufr-ros-pkg.googlecode.com/svn/tags/stacks/$STACK_NAME/$STACK_NAME-$STACK_VERSION'}} +rosinstalls['svn'] = {} +rosinstalls['svn']['release-tar'] = rosinstalls['git']['release-tar'] +rosinstalls['svn']['devel'] = [{'svn': {'local-name': 'local_name', + 'uri': 'https://alufr-ros-pkg.googlecode.com/svn/trunk/$STACK_NAMEevaled', + }}] +rosinstalls['svn']['release'] = [{'svn': {'local-name': 'local_name', + 'uri': 'https://alufr-ros-pkg.googlecode.com/svn/tags/stacks/$STACK_NAME/$STACK_NAME-$STACK_VERSIONevaled', + }}] +rosinstalls['svn']['distro'] = [{'svn': {'local-name': 'local_name', + 'uri': 'https://alufr-ros-pkg.googlecode.com/svn/tags/distros/$RELEASE_NAME/stacks/$STACK_NAMEevaled', + }}] +default_rules['hg'] = {'hg': {'dev-branch': 'default', + 'distro-tag': '$RELEASE_NAME', + 'release-tag': '$STACK_NAME-$STACK_VERSION', + 'uri': 'https://kforge.ros.org/navigation/navigation'}} +default_rules['bzr'] = {'bzr': {'anon-uri': 'lp:sr-ros-interface', + 'dev-branch': 'stable', + 'distro-tag': '$RELEASE_NAME', + 'release-tag': '$STACK_NAME-$STACK_VERSION', + 'uri': 'bzr+ssh://bazaar.launchpad.net/~shadowrobot/sr-ros-interface'}} + + +def test_to_rosinstall(): + from rospkg.distro import load_vcs_config + + def rule_eval(x): + return x + 'evaled' + anonymous = True + # TODO: for branch in ['devel', 'release', 'distro']: + for vcs in ['git', 'svn']: + vcs_config = load_vcs_config(default_rules[vcs], rule_eval) + for branch in ['release', 'distro', 'release-tar', 'devel']: + retval = vcs_config.to_rosinstall('local_name', branch, anonymous) + assert retval == rosinstalls[vcs][branch], "%s %s:\n%s\nvs.\n%s" % (vcs, branch, retval, rosinstalls[vcs][branch]) + + +def test_VcsConfig(): + from rospkg.distro import VcsConfig + vcs_config = VcsConfig('fake') + vcs_config.tarball_url = 'http://foo' + assert 'fake' == vcs_config.type + for b in ['devel', 'release', 'distro']: + try: + vcs_config.get_branch(b, False) + assert False, "should have raised" + b + except ValueError: + pass + for anon in [True, False]: + assert ('http://foo', None) == vcs_config.get_branch('release-tar', anon) + + +def test_BZRConfig(): + from rospkg.distro import BzrConfig + anon_rules = default_rules['bzr']['bzr'] + rules = anon_rules.copy() + rules['uri'] = rules['anon-uri'] + del rules['anon-uri'] + + config = BzrConfig() + anon_config = BzrConfig() + + required = ['dev-branch', 'distro-tag', 'release-tag', 'uri'] + for r in required: + bad_copy = rules.copy() + del bad_copy[r] + try: + config.load(bad_copy, lambda x: x) + assert False, "should have raised" + except KeyError: + pass + + config.load(rules, lambda x: x + 'evaled') + anon_config.load(anon_rules, lambda x: x + 'evaled') + + repo_uri = anon_rules['uri'] + 'evaled' + anon_repo_uri = anon_rules['anon-uri'] + 'evaled' + assert config.repo_uri == anon_repo_uri, config.repo_uri + assert config.anon_repo_uri == anon_repo_uri, config.anon_repo_uri + assert anon_config.repo_uri == repo_uri, anon_config.repo_uri + for c in [config, anon_config]: + assert c.dev_branch == 'stableevaled' + assert c.distro_tag == '$RELEASE_NAMEevaled' + assert c.release_tag == '$STACK_NAME-$STACK_VERSIONevaled' + assert c.anon_repo_uri == anon_repo_uri + + c = anon_config + assert c.get_branch('devel', False) == (repo_uri, 'stableevaled') + assert c.get_branch('devel', True) == (anon_repo_uri, 'stableevaled') + assert c.get_branch('distro', False) == (repo_uri, '$RELEASE_NAMEevaled') + assert c.get_branch('distro', True) == (anon_repo_uri, '$RELEASE_NAMEevaled') + assert c.get_branch('release', False) == (repo_uri, '$STACK_NAME-$STACK_VERSIONevaled') + assert c.get_branch('release', True) == (anon_repo_uri, '$STACK_NAME-$STACK_VERSIONevaled') + try: + c.get_branch('foo', True) + assert False + except ValueError: + pass + # setup for coverage -- invalidate release branch + rel_tag = c.release_tag + c.release_tag = None + try: + assert c.get_branch('release', False) + assert False + except ValueError: + pass + c.release_tag = rel_tag + + # test equals + config2 = BzrConfig() + config2.load(rules, lambda x: x + 'evaled') + assert config == config2 + anon_config2 = BzrConfig() + anon_config2.load(anon_rules, lambda x: x + 'evaled') + assert anon_config == anon_config2 + + # test eq + config_check = BzrConfig() + config_check_eq = BzrConfig() + config_check_neq = BzrConfig() + config_check.load(rules, lambda x: x + 'evaled') + config_check_eq.load(rules, lambda x: x + 'evaled') + config_check_neq.load(anon_rules, lambda x: x + 'evaled') + assert config_check == config_check_eq + assert config_check != config_check_neq + + +def test_HgConfig(): + from rospkg.distro import HgConfig + anon_rules = { + 'dev-branch': 'default', + 'distro-tag': '$RELEASE_NAME', + 'release-tag': '$STACK_NAME-$STACK_VERSION', + 'anon-uri': 'https://kforge.ros.org/navigation/navigation', + 'uri': 'ssh://user@kforge.ros.org/navigation/navigation' + } + rules = default_rules['hg']['hg'] + + config = HgConfig() + anon_config = HgConfig() + + required = ['dev-branch', 'distro-tag', 'release-tag', 'uri'] + for r in required: + bad_copy = rules.copy() + del bad_copy[r] + try: + config.load(bad_copy, lambda x: x) + assert False, "should have raised" + except KeyError: + pass + + config.load(rules, lambda x: x + 'evaled') + anon_config.load(anon_rules, lambda x: x + 'evaled') + + repo_uri = 'ssh://user@kforge.ros.org/navigation/navigationevaled' + anon_repo_uri = 'https://kforge.ros.org/navigation/navigationevaled' + assert config.repo_uri == anon_repo_uri, config.repo_uri + assert config.anon_repo_uri == anon_repo_uri, config.anon_repo_uri + assert anon_config.repo_uri == repo_uri, anon_config.repo_uri + for c in [config, anon_config]: + assert c.dev_branch == 'defaultevaled' + assert c.distro_tag == '$RELEASE_NAMEevaled' + assert c.release_tag == '$STACK_NAME-$STACK_VERSIONevaled' + assert c.anon_repo_uri == anon_repo_uri + + c = anon_config + assert c.get_branch('devel', False) == (repo_uri, 'defaultevaled') + assert c.get_branch('devel', True) == (anon_repo_uri, 'defaultevaled') + assert c.get_branch('distro', False) == (repo_uri, '$RELEASE_NAMEevaled') + assert c.get_branch('distro', True) == (anon_repo_uri, '$RELEASE_NAMEevaled') + assert c.get_branch('release', False) == (repo_uri, '$STACK_NAME-$STACK_VERSIONevaled') + assert c.get_branch('release', True) == (anon_repo_uri, '$STACK_NAME-$STACK_VERSIONevaled') + + # test equals + config2 = HgConfig() + config2.load(rules, lambda x: x + 'evaled') + assert config == config2 + anon_config2 = HgConfig() + anon_config2.load(anon_rules, lambda x: x + 'evaled') + assert anon_config == anon_config2 + + # test eq + config_check = HgConfig() + config_check_eq = HgConfig() + config_check_neq = HgConfig() + config_check.load(rules, lambda x: x + 'evaled') + config_check_eq.load(rules, lambda x: x + 'evaled') + config_check_neq.load(anon_rules, lambda x: x + 'evaled') + assert config_check == config_check_eq + assert config_check != config_check_neq + + +def test_GitConfig(): + from rospkg.distro import GitConfig + anon_rules = default_rules['git']['git'] + rules = anon_rules.copy() + del rules['anon-uri'] + + config = GitConfig() + anon_config = GitConfig() + + required = ['dev-branch', 'distro-tag', 'release-tag', 'uri'] + for r in required: + bad_copy = rules.copy() + del bad_copy[r] + try: + config.load(bad_copy, lambda x: x) + assert False, "should have raised" + except KeyError: + pass + + config.load(rules, lambda x: x + 'evaled') + anon_config.load(anon_rules, lambda x: x + 'evaled') + repo_uri = 'git@github.com:ipa320/$STACK_NAME.gitevaled' + anon_repo_uri = 'https://github.com/ipa320/$STACK_NAME.gitevaled' + + assert config.repo_uri == repo_uri + assert anon_config.anon_repo_uri == anon_repo_uri + for c in [config, anon_config]: + dev_branch = 'release_electricevaled' + assert c.dev_branch == dev_branch + assert c.distro_tag == '$RELEASE_NAMEevaled' + assert c.release_tag == '$STACK_NAME-$STACK_VERSIONevaled' + assert c.repo_uri == repo_uri + + c = anon_config + assert c.get_branch('devel', False) == (repo_uri, dev_branch), c.get_branch('devel', False) + assert c.get_branch('devel', True) == (anon_repo_uri, dev_branch) + assert c.get_branch('distro', False) == (repo_uri, '$RELEASE_NAMEevaled') + assert c.get_branch('distro', True) == (anon_repo_uri, '$RELEASE_NAMEevaled') + assert c.get_branch('release', False) == (repo_uri, '$STACK_NAME-$STACK_VERSIONevaled') + assert c.get_branch('release', True) == (anon_repo_uri, '$STACK_NAME-$STACK_VERSIONevaled') + + # test equals + config2 = GitConfig() + config2.load(rules, lambda x: x + 'evaled') + assert config == config2 + anon_config2 = GitConfig() + anon_config2.load(anon_rules, lambda x: x + 'evaled') + assert anon_config == anon_config2 + + +def test_SvnConfig(): + from rospkg.distro import SvnConfig + config = SvnConfig() + required = ['dev', 'distro-tag', 'release-tag'] + rules = default_rules['svn']['svn'] + + anon_rules = { + 'anon-dev': 'http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/trunk', + 'anon-distro-tag': 'http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$RELEASE_NAME', + 'anon-release-tag': 'http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$STACK_NAME-$STACK_VERSION', + 'dev': 'https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/trunk', + 'distro-tag': 'https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$RELEASE_NAME', + 'release-tag': 'https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$STACK_NAME-$STACK_VERSION'} + + # make sure it validates + for k in required: + bad_copy = rules.copy() + del bad_copy[k] + try: + config.load(bad_copy, lambda x: x) + assert False, "should have raised" + except KeyError: + pass + + # load w/o anon rules + config.load(rules, lambda x: x + 'evaled') + assert config.dev == 'https://alufr-ros-pkg.googlecode.com/svn/trunk/$STACK_NAMEevaled' + assert config.distro_tag == 'https://alufr-ros-pkg.googlecode.com/svn/tags/distros/$RELEASE_NAME/stacks/$STACK_NAMEevaled' + assert config.release_tag == 'https://alufr-ros-pkg.googlecode.com/svn/tags/stacks/$STACK_NAME/$STACK_NAME-$STACK_VERSIONevaled' + + assert config.anon_dev == 'https://alufr-ros-pkg.googlecode.com/svn/trunk/$STACK_NAMEevaled' + assert config.anon_distro_tag == 'https://alufr-ros-pkg.googlecode.com/svn/tags/distros/$RELEASE_NAME/stacks/$STACK_NAMEevaled' + assert config.anon_release_tag == 'https://alufr-ros-pkg.googlecode.com/svn/tags/stacks/$STACK_NAME/$STACK_NAME-$STACK_VERSIONevaled' + + # test eq + config_check = SvnConfig() + config_check_eq = SvnConfig() + config_check_neq = SvnConfig() + config_check.load(rules, lambda x: x + 'evaled') + config_check_eq.load(rules, lambda x: x + 'evaled') + config_check_neq.load(anon_rules, lambda x: x + 'evaled') + assert config_check == config_check_eq + assert config_check != config_check_neq + + # load w anon rules + config2 = SvnConfig() + config.load(anon_rules, lambda x: x + 'evaled') + config2.load(anon_rules, lambda x: x + 'evaled') + for c in [config, config2]: + assert c.anon_dev == 'http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/trunkevaled' + assert c.anon_distro_tag == 'http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$RELEASE_NAMEevaled' + assert c.anon_release_tag == 'http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$STACK_NAME-$STACK_VERSIONevaled' + assert c.dev == 'https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/trunkevaled' + assert c.distro_tag == 'https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$RELEASE_NAMEevaled' + assert c.release_tag == 'https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$STACK_NAME-$STACK_VERSIONevaled' + + # test get_branch + assert c.get_branch('devel', True) == ('http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/trunkevaled', None) + assert c.get_branch('distro', True) == ('http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$RELEASE_NAMEevaled', None) + assert c.get_branch('release', True) == ('http://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$STACK_NAME-$STACK_VERSIONevaled', None) + assert c.get_branch('devel', False) == ('https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/trunkevaled', None) + assert c.get_branch('distro', False) == ('https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$RELEASE_NAMEevaled', None) + assert c.get_branch('release', False) == ('https://svn.mech.kuleuven.be/repos/orocos/trunk/kul-ros-pkg/stacks/$STACK_NAME/tags/$STACK_NAME-$STACK_VERSIONevaled', None) + + # get full coverage on get_branch() + try: + c.get_branch('fake', False) + assert False + except KeyError: + pass + # setup for coverage -- invalidate release branch + rel_tag = c.release_tag + c.release_tag = None + try: + assert c.get_branch('release', False) + assert False + except ValueError: + pass + c.release_tag = rel_tag + + +def test_load_vcs_config(): + from rospkg.distro import load_vcs_config, get_vcs_configs + for t in ['svn', 'git', 'hg', 'bzr']: + assert t in get_vcs_configs() + config = load_vcs_config(default_rules[t], lambda x: x + 'evaled') + assert config.type == t, t diff --git a/test/test_rospkg_environment.py b/test/test_rospkg_environment.py new file mode 100644 index 0000000..2e44ce0 --- /dev/null +++ b/test/test_rospkg_environment.py @@ -0,0 +1,168 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import tempfile + + +def test_get_ros_root(): + from rospkg import get_ros_root + assert get_ros_root(env={}) is None + + env = {'ROS_ROOT': '/fake/path'} + assert '/fake/path' == get_ros_root(env=env) + + real_ros_root = get_ros_root() + + if real_ros_root is not None: + # make sure that ros root is a directory + p = os.path.join(real_ros_root, 'Makefile') + env = {'ROS_ROOT': p} + assert p == get_ros_root(env=env) + + +def test_get_ros_package_path(): + from rospkg import get_ros_package_path + assert get_ros_package_path(env={}) is None + env = {'ROS_PACKAGE_PATH': ':'} + assert ':' == get_ros_package_path(env=env) + + # trip-wire tests. Cannot guarantee that ROS_PACKAGE_PATH is set + # to valid value on test machine, just make sure logic doesn't crash + assert os.environ.get('ROS_PACKAGE_PATH', None) == get_ros_package_path() + + +def test_get_log_dir(): + from rospkg import get_log_dir, get_ros_root + base = tempfile.gettempdir() + ros_log_dir = os.path.join(base, 'ros_log_dir') + ros_home_dir = os.path.join(base, 'ros_home_dir') + home_dir = os.path.expanduser('~') + + # ROS_LOG_DIR has precedence + env = {'ROS_ROOT': get_ros_root(), 'ROS_LOG_DIR': ros_log_dir, 'ROS_HOME': ros_home_dir} + assert ros_log_dir == get_log_dir(env=env) + + env = {'ROS_ROOT': get_ros_root(), 'ROS_HOME': ros_home_dir} + assert os.path.join(ros_home_dir, 'log') == get_log_dir(env=env) + + env = {'ROS_ROOT': get_ros_root()} + assert os.path.join(home_dir, '.ros', 'log') == get_log_dir(env=env) + + # test default assignment of env. Don't both checking return value as we would duplicate get_log_dir + assert get_log_dir() is not None + + +def test_get_test_results_dir(): + from rospkg import get_ros_root, get_test_results_dir + base = tempfile.gettempdir() + ros_test_results_dir = os.path.join(base, 'ros_test_results_dir') + ros_home_dir = os.path.join(base, 'ros_home_dir') + home_dir = os.path.expanduser('~') + + # ROS_TEST_RESULTS_DIR has precedence + env = {'ROS_ROOT': get_ros_root(), 'ROS_TEST_RESULTS_DIR': ros_test_results_dir, 'ROS_HOME': ros_home_dir} + assert ros_test_results_dir == get_test_results_dir(env=env) + + env = {'ROS_ROOT': get_ros_root(), 'ROS_HOME': ros_home_dir} + assert os.path.join(ros_home_dir, 'test_results') == get_test_results_dir(env=env) + + env = {'ROS_ROOT': get_ros_root()} + assert os.path.join(home_dir, '.ros', 'test_results') == get_test_results_dir(env=env) + + # test default assignment of env. Don't both checking return value as we would duplicate get_test_results_dir + assert get_test_results_dir() is not None + + +def test_get_ros_home(): + from rospkg import get_ros_root, get_ros_home + base = tempfile.gettempdir() + ros_home_dir = os.path.join(base, 'ros_home_dir') + home_dir = os.path.expanduser('~') + + # ROS_HOME has precedence + env = {'ROS_ROOT': get_ros_root(), 'ROS_HOME': ros_home_dir} + assert ros_home_dir == get_ros_home(env=env) + + env = {'ROS_ROOT': get_ros_root()} + assert os.path.join(home_dir, '.ros') == get_ros_home(env=env) + + # test default assignment of env. Don't both checking return value + assert get_ros_home() is not None + + +def test_on_ros_path(): + from rospkg import on_ros_path, get_ros_root, get_ros_package_path + from rospkg.environment import _resolve_paths + + assert not on_ros_path(tempfile.gettempdir()) + + if get_ros_root() is not None: + assert on_ros_path(get_ros_root()) + + if get_ros_package_path() is not None: + paths = _resolve_paths(get_ros_package_path()).split(os.pathsep) + for p in paths: + assert on_ros_path(p), "failed: %s, [%s]" % (p, paths) + + +def test_compute_package_paths(): + from rospkg.environment import _compute_package_paths as compute_package_paths + assert compute_package_paths(None, None) == [] + assert compute_package_paths('foo', None) == ['foo'] + assert compute_package_paths(None, 'bar') == ['bar'], compute_package_paths(None, 'bar') + assert compute_package_paths('foo', '') == ['foo'] + assert compute_package_paths('foo', 'bar') == ['foo', 'bar'] + assert compute_package_paths('foo', 'bar:bz') == ['foo', 'bar', 'bz'] + assert compute_package_paths('foo', 'bar:bz::blah') == ['foo', 'bar', 'bz', 'blah'] + + +def test_resolve_path(): + # mainly for coverage + from rospkg.environment import _resolve_path + assert os.path.expanduser('~') == _resolve_path('~') + + +def test_get_etc_ros_dir(): + from rospkg import get_etc_ros_dir + from rospkg.environment import ROS_ETC_DIR + base = tempfile.gettempdir() + etc_ros_dir = os.path.join(base, 'etc_ros_dir') + + assert '/etc/ros' == get_etc_ros_dir(env={}) + + # ROS_ETC_DIR has precedence + env = {ROS_ETC_DIR: etc_ros_dir} + assert etc_ros_dir == get_etc_ros_dir(env=env), get_etc_ros_dir(env=env) + + # test default assignment of env. Don't both checking return value as we would duplicate get_etc_ros_dir + assert get_etc_ros_dir() is not None diff --git a/test/test_rospkg_manifest.py b/test/test_rospkg_manifest.py new file mode 100644 index 0000000..3b420e2 --- /dev/null +++ b/test/test_rospkg_manifest.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*- +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function + +import os + + +def test_InvalidManifest(): + from rospkg import InvalidManifest + assert isinstance(InvalidManifest(), Exception) + + +def test_Platform(): + from rospkg.manifest import Platform + for bad in [None, '']: + try: + Platform(bad, '1') + assert False, "should have failed on [%s]" % bad + except ValueError: + pass + try: + Platform('ubuntu', bad) + assert False, "should have failed on [%s]" % bad + except ValueError: + pass + + p = Platform('ubuntu', '8.04') + assert 'ubuntu 8.04' == str(p) + assert 'ubuntu 8.04' == repr(p) + + for v in [Platform('ubuntu', '8.04'), Platform('ubuntu', '8.04', notes=None)]: + assert p == p + for v in [Platform('ubuntu', '8.04', 'some notes'), 'foo', 1]: + assert p != v + + # note: probably actually "osx" + p = Platform('OS X', '10.6', 'macports') + assert 'OS X 10.6' == str(p) + assert 'OS X 10.6' == repr(p) + + for v in [p, Platform('OS X', '10.6', 'macports')]: + assert p == p + for v in [Platform('OS X', '10.6'), 'foo', 1]: + assert p != v + + +def test_Depend(): + from rospkg.manifest import Depend + for bad in [None, '']: + try: + Depend(bad, 'package') + assert False, "should have failed on [%s]" % bad + except ValueError: + pass + try: + Depend('foo', bad) + assert False, "should have failed on [%s]" % bad + except ValueError: + pass + + d = Depend('roslib', 'package') + assert 'roslib' == str(d) + assert 'roslib' == repr(d) + + assert d == Depend('roslib', 'package') + for v in [Depend('roslib', 'stack'), Depend('roslib2', 'package'), 1]: + assert d != v + + +def _subtest_parse_example1(m): + from rospkg.manifest import Manifest + assert isinstance(m, Manifest) + assert 'package' == m.type + assert "a brief description" == m.brief + assert "Line 1\nLine 2" == m.description.strip() + assert u"The authors go here.\nutf-8 test: ÄÖÜ" == m.author.strip() + assert "Public Domain\nwith other stuff" == m.license.strip() + assert "http://pr.willowgarage.com/package/" == m.url + for d in m.depends: + assert 'package' == d.type + dpkgs = [d.name for d in m.depends] + assert set(['pkgname', 'common']) == set(dpkgs) + rdpkgs = [d.name for d in m.rosdeps] + assert set(['python', 'bar', 'baz']) == set(rdpkgs) + for p in m.platforms: + if p.os == 'ubuntu': + assert "8.04" == p.version + assert '' == p.notes + elif p.os == 'OS X': + assert "10.6" == p.version + assert "macports" == p.notes + else: + assert False, "unknown platform " + str(p) + + +def _subtest_parse_stack_example1(m): + from rospkg.manifest import Manifest + assert isinstance(m, Manifest) + assert 'stack' == m.type + assert "a brief description" == m.brief + assert "Line 1\nLine 2" == m.description.strip() + assert "The authors\ngo here" == m.author.strip() + assert "Public Domain\nwith other stuff" == m.license.strip() + assert "http://ros.org/stack/" == m.url + for d in m.depends: + assert 'stack' == d.type + dpkgs = [d.name for d in m.depends] + assert set(['stackname', 'common']) == set(dpkgs) + assert [] == m.rosdeps + assert [] == m.exports + + +def _subtest_parse_stack_version(m): + assert "1.2.3" == m.version + + +def get_test_dir(): + return os.path.abspath(os.path.join(os.path.dirname(__file__), 'manifest')) + + +def test_is_catkin(): + from rospkg.manifest import MANIFEST_FILE, parse_manifest_file + d = get_test_dir() + m = parse_manifest_file(os.path.join(d, 'catkin'), MANIFEST_FILE) + assert m.is_catkin + m = parse_manifest_file(os.path.join(d, 'example1'), MANIFEST_FILE) + assert not m.is_catkin + + +def test_parse_manifest_file(): + from rospkg.manifest import parse_manifest_file, MANIFEST_FILE, STACK_FILE + + d = get_test_dir() + m = parse_manifest_file(os.path.join(d, 'example1'), MANIFEST_FILE) + _subtest_parse_example1(m) + + m = parse_manifest_file(os.path.join(d, 'stack_example1'), STACK_FILE) + _subtest_parse_stack_example1(m) + + m = parse_manifest_file(os.path.join(d, 'stack_version'), STACK_FILE) + _subtest_parse_stack_version(m) + + +def test_parse_manifest(): + # test_parse_manifest_file is more thorough; just want to make sure we have one call to lower-level API + from rospkg.manifest import MANIFEST_FILE, parse_manifest + d = get_test_dir() + p = os.path.join(d, 'example1', MANIFEST_FILE) + with open(p, 'r') as f: + contents = f.read() + _subtest_parse_example1(parse_manifest(MANIFEST_FILE, contents, p)) + + +def test__Manifest(): + from rospkg.manifest import Manifest + m = Manifest() + # check defaults + assert 'package' == m.type + + m = Manifest('package') + assert 'package' == m.type + m = Manifest('stack') + assert 'stack' == m.type + + # tripwire, no defined value + str(m) + repr(m) + + +# bad file examples should be more like the roslaunch tests where there is just 1 thing wrong +def test_parse_bad_file(): + from rospkg.manifest import parse_manifest, InvalidManifest, MANIFEST_FILE + base_p = get_test_dir() + for b in ['bad1.xml', 'bad2.xml', 'bad3.xml']: + p = os.path.join(base_p, b) + with open(p, 'r') as f: + contents = f.read() + try: + parse_manifest(MANIFEST_FILE, contents, filename=p) + assert False, "parse should have failed on bad manifest" + except InvalidManifest as e: + print(str(e)) + assert p in str(e), "file name [%s] should be in error message [%s]" % (p, str(e)) + + +EXAMPLE1 = u""" + Line 1 +Line 2 + + The authors go here. +utf-8 test: ÄÖÜ + Public Domain +with other stuff + http://pr.willowgarage.com/package/ + http://www.willowgarage.com/files/willowgarage/robot10.jpg + + + + + + + + + + + + + + +""" + +STACK_EXAMPLE1 = """ + Line 1 +Line 2 + + The authors +go here + Public Domain +with other stuff + http://ros.org/stack/ + http://www.willowgarage.com/files/willowgarage/robot10.jpg + + +""" + +STACK_INVALID1 = """ + Line 1 + The authors + Public Domain + +""" + +STACK_INVALID2 = """ + Line 1 + The authors + Public Domain + + + + +""" diff --git a/test/test_rospkg_os_detect.py b/test/test_rospkg_os_detect.py new file mode 100644 index 0000000..402d798 --- /dev/null +++ b/test/test_rospkg_os_detect.py @@ -0,0 +1,700 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2009, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import absolute_import + +import os + +import mock +from mock import patch + + +class TrueOs(): + def is_os(self): + return True + + def get_version(self): + return "os_version" + + def get_codename(self): + return "os_codename" + + +class TrueOs2(): + def is_os(self): + return True + + def get_version(self): + return "os_version" + + def get_codename(self): + return "os_codename" + + +class FalseOs(object): + def is_os(self): + return False + + def get_version(self): + return "os_version2" + + def get_codename(self): + return "os_codename" + + +def test__read_stdout(): + from rospkg.os_detect import _read_stdout + assert 'hello' == _read_stdout(['echo', 'hello']) + assert _read_stdout(['bad-command-input-for-rospkg-os-detect']) is None + + +def test_tripwire_ubuntu(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('ubuntu') + + +def test_LsbDetect(): + from rospkg.os_detect import LsbDetect, OsNotDetected + + # test non-match + detect = LsbDetect('bad') + assert not detect.is_os() + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + # test match + # to be removed after Ubuntu Xenial is out of support + import sys + if sys.version_info >= (3, 8): + import distro + else: + import platform as distro + + distro.linux_distribution = mock.Mock() + distro.linux_distribution.return_value = ('Ubuntu', '10.04', 'lucid') + distro.dist = mock.Mock() + distro.dist.return_value = ('Ubuntu', '10.04', 'lucid') + + detect = LsbDetect('Ubuntu') + assert detect.is_os(), "should be Ubuntu" + assert detect.get_codename() == 'lucid', detect.get_codename() + + # test freely + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + + +def test_ubuntu(): + from rospkg.os_detect import OsDetect, OsNotDetected + + os_detector = OsDetect() + detect = os_detector.get_detector('ubuntu') + detect.lsb_info = ('Ubuntu', '10.04', 'lucid') + + assert detect.get_version() == '10.04', detect.get_version() + assert detect.get_codename() == 'lucid', detect.get_codename() + + # test freely + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + +def test_tripwire_debian(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('debian') + + +def test_tripwire_osx(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('osx') + + +def test_osx(): + if 'posix' != os.name: + from unittest.case import SkipTest + raise SkipTest('Test requires POSIX platform, not "{}"'.format(os.name)) + + from rospkg.os_detect import OSX, _osx_codename, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'osx') + detect = OSX(os.path.join(test_dir, "sw_vers")) + assert detect.is_os() + assert detect.get_codename() == 'snow' + assert detect.get_version() == '10.6.5', detect.get_version() + + # trigger bad version number detect + detect = OSX(os.path.join(test_dir, "sw_vers_bad")) + assert detect.is_os() + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + # regression test codename mapping + assert 'lion' == _osx_codename(10, 7) + try: + _osx_codename(9, 7) + assert False + except OsNotDetected: + pass + + +def test_osx_patched(): + from rospkg.os_detect import OSX, OsNotDetected + + @patch.object(OSX, 'is_os') + def test(mock): + mock.return_value = False + detect = OSX() + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + try: + detect.get_version() + assert False + except OsNotDetected: + pass + test() + + +def test_tripwire_arch(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('arch') + + +def test_arch(): + from rospkg.os_detect import Arch, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'arch') + detect = Arch(os.path.join(test_dir, "arch-release")) + assert detect.is_os() + assert detect.get_version() == '' + + detect = Arch() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + @patch.object(Arch, 'is_os') + def test(mock): + mock.is_os.return_value = True + detect = Arch() + assert detect.get_version() == '' + assert detect.get_codename() == '' + test() + + +def test_tripwire_manjaro(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('manjaro') + + +def test_manjaro(): + from rospkg.os_detect import Manjaro, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'manjaro') + detect = Manjaro(os.path.join(test_dir, "manjaro-release")) + assert detect.is_os() + assert detect.get_version() == '' + + detect = Manjaro() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + @patch.object(Manjaro, 'is_os') + def test(mock): + mock.is_os.return_value = True + detect = Manjaro() + assert detect.get_version() == '' + assert detect.get_codename() == '' + test() + + +def test_tripwire_opensuse(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('opensuse') + + +def test_opensuse(): + from rospkg.os_detect import OpenSuse, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'opensuse') + detect = OpenSuse(os.path.join(test_dir, "SuSE-brand")) + assert detect.is_os() + assert detect.get_version() == '11.2' + + detect = OpenSuse() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + + +def test_opensuse13(): + from rospkg.os_detect import OpenSuse, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'opensuse13') + detect = OpenSuse(os.path.join(test_dir, "SUSE-brand")) + assert detect.is_os() + assert detect.get_version() == '13.1' + + detect = OpenSuse() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + + +def test_tripwire_gentoo(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('gentoo') + + +def test_gentoo(): + from rospkg.os_detect import Gentoo, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'gentoo') + detect = Gentoo(os.path.join(test_dir, "gentoo-release")) + assert detect.is_os() + assert detect.get_version() == '2.0.1' + assert detect.get_codename() == '' + + # test freely + detect = Gentoo() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + +def test_tripwire_fedora(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('fedora') + + +def get_test_dir(): + return os.path.abspath(os.path.join(os.path.dirname(__file__), + 'os_detect')) + + +def test_fedora(): + from rospkg.os_detect import Fedora, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'fedora') + release_file, issue_file = [os.path.join(test_dir, x) for + x in ["redhat-release", "issue"]] + detect = Fedora(release_file, issue_file) + assert detect.is_os() + assert detect.get_version() == '1' + assert detect.get_codename() == 'bordeaux', detect.get_codename() + + detect = Fedora() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + + +def test_read_issue(): + from rospkg.os_detect import read_issue + assert read_issue('/fake/file') is None + test_dir = os.path.join(get_test_dir(), 'rhel') + assert read_issue(os.path.join(test_dir, 'issue')) == \ + ['Red', 'Hat', 'Enterprise', 'Linux', 'AS', 'release', '3', '(Taroon)'] + + +def test_OsDetector(): + from rospkg.os_detect import OsDetector + d = OsDetector() + try: + d.is_os() + assert False + except NotImplementedError: + pass + try: + d.get_version() + assert False + except NotImplementedError: + pass + try: + d.get_codename() + assert False + except NotImplementedError: + pass + + +def test_tripwire_uname_get_machine(): + from rospkg.os_detect import uname_get_machine + retval = uname_get_machine() + assert retval in [None, 'aarch64', 'armv7l', 'i386', 'i686', 'ppc', 'ppc64', 'ppc64le', 's390', 's390x', 'x86_64'] + + +def test_tripwire_rhel(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('rhel') + + +def test_redhat(): + from rospkg.os_detect import Rhel, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'rhel') + + # go through several test files + detect = Rhel(os.path.join(test_dir, "redhat-release")) + assert detect.is_os() + assert detect.get_version() == '3' + assert detect.get_codename() == 'taroon' + + detect = Rhel(os.path.join(test_dir, "redhat-release-tikanga")) + assert detect.is_os() + assert detect.get_version() == '5' + assert detect.get_codename() == 'tikanga' + + detect = Rhel(os.path.join(test_dir, "redhat-release-nahant")) + assert detect.is_os() + assert detect.get_version() == '4' + assert detect.get_codename() == 'nahant' + + # test freely + detect = Rhel() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + +def test_tripwire_slackware(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('slackware') + + +def test_slackware(): + from rospkg.os_detect import Slackware, OsNotDetected + test_dir = os.path.join(get_test_dir(), 'slackware') + detect = Slackware(os.path.join(test_dir, "slackware-version")) + assert detect.is_os() + assert detect.get_version() == '14.2' + assert detect.get_codename() == '' + + # test freely + detect = Slackware() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + +def test_tripwire_freebsd(): + from rospkg.os_detect import OsDetect + os_detect = OsDetect() + os_detect.get_detector('freebsd') + + +def test_freebsd(): + from rospkg.os_detect import FreeBSD, OsNotDetected + # TODO + if 0: + test_dir = os.path.join(get_test_dir(), 'freebsd') + release_file, issue_file = [os.path.join(test_dir, x) for + x in ["redhat-release", "issue"]] + detect = FreeBSD(release_file, issue_file) + assert detect.is_os() + assert detect.get_version() == '3' + + # test freely + detect = FreeBSD() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + + # assure failure + detect = FreeBSD("/fake/uname/file") + assert not detect.is_os() + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + @patch.object(FreeBSD, 'is_os') + def test(mock): + mock.is_os.return_value = True + detect = FreeBSD() + assert detect.get_codename() == '' + test() + + +def test_cygwin(): + from rospkg.os_detect import Cygwin, OsNotDetected + # TODO + detect = Cygwin() + if not detect.is_os(): + try: + detect.get_version() + assert False + except OsNotDetected: + pass + + try: + detect.get_codename() + assert False + except OsNotDetected: + pass + + @patch.object(Cygwin, 'is_os') + def test(mock): + mock.is_os.return_value = True + detect = Cygwin() + assert detect.get_codename() == '' + test() + + +def test_OsDetect(): + from rospkg.os_detect import OsDetect + detect = OsDetect() + try: + detect.get_detector('fake') + assert False, "should raise" + except KeyError: + pass + + +def test_OsDetect_ROS_OVERRIDE(): + from rospkg.os_detect import OsDetect + detect = OsDetect([('TrueOs', TrueOs())]) + env = {'ROS_OS_OVERRIDE': 'arch'} + assert detect.detect_os(env=env) == ('arch', '', ''), \ + detect.detect_os(env=env) + env = {'ROS_OS_OVERRIDE': 'fubuntu:04.10'} + assert detect.detect_os(env=env) == ('fubuntu', '04.10', '') + env = {'ROS_OS_OVERRIDE': 'fubuntu:04.10:opaque'} + assert detect.detect_os(env=env) == ('fubuntu', '04.10', 'opaque') + + +def test_OsDetect_single(): + # test each method twice with new instance b/c of caching + from rospkg.os_detect import OsDetect + detect = OsDetect([('TrueOs', TrueOs())]) + assert "TrueOs" == detect.get_name() + assert "TrueOs" == detect.get_name() + detect = OsDetect([('TrueOs', TrueOs())]) + assert "os_version" == detect.get_version() + assert "os_version" == detect.get_version() + detect = OsDetect([('TrueOs', TrueOs())]) + assert "os_codename" == detect.get_codename() + assert "os_codename" == detect.get_codename() + + detect = OsDetect([('TrueOs', TrueOs())]) + assert isinstance(detect.get_detector(), TrueOs) + assert isinstance(detect.get_detector('TrueOs'), TrueOs) + + +def test_OsDetect_register_default_add_detector(): + # test behavior of register_default and add_detector. Both take + # precedence over previous detectors, but at different scopes. + from rospkg.os_detect import OsDetect + o1 = TrueOs() + o2 = TrueOs2() + key = 'TrueOs' + detect = OsDetect([(key, o1)]) + + assert detect.get_detector(key) == o1 + detect.register_default(key, o2) + assert detect.get_detector(key) == o1 + detect.add_detector(key, o2) + assert detect.get_detector(key) == o2 + + detect = OsDetect() + assert detect.get_detector(key) == o2 + detect.add_detector(key, o1) + assert detect.get_detector(key) == o1 + + # restore precendence of o1 in default list + detect.register_default(key, o1) + detect = OsDetect() + assert detect.get_detector(key) == o1 + + +def test_OsDetect_nomatch(): + from rospkg.os_detect import OsDetect, OsNotDetected + detect = OsDetect([('Dummy', FalseOs())]) + assert isinstance(detect.get_detector('Dummy'), FalseOs) + try: + detect.get_name() + assert False + except OsNotDetected: + pass + try: + detect.get_version() + assert False + except OsNotDetected: + pass + try: + detect.get_detector() + assert False + except OsNotDetected: + pass + + +def xTrueOsDetect_first_of_two(): + osa = roslib.os_detect.OSDetect([TrueOs(), FalseOs()]) + assert "os_name" == osa.get_name() + assert "os_version" == osa.get_version() + os_class = osa.get_os() + assert "os_name" == os_class.get_name() + assert "os_version" == os_class.get_version() + + +def xTrueOsDetect_second_of_two(): + osa = roslib.os_detect.OSDetect([FalseOs(), TrueOs()]) + assert "os_name", osa.get_name() + assert "os_version", osa.get_version() + os_class = osa.get_os() + assert "os_name" == os_class.get_name() + assert "os_version" == os_class.get_version() + + +def xTrueOsDetect_first_of_many(): + osa = roslib.os_detect.OSDetect([TrueOs(), FalseOs(), FalseOs(), FalseOs(), FalseOs()]) + assert "os_name" == osa.get_name() + assert "os_version" == osa.get_version() + os_class = osa.get_os() + assert "os_name" == os_class.get_name() + assert "os_version" == os_class.get_version() + + +def xTrueOsDetect_second_of_many(): + osa = roslib.os_detect.OSDetect([FalseOs(), TrueOs(), FalseOs(), FalseOs(), FalseOs()]) + assert "os_name" == osa.get_name() + assert "os_version" == osa.get_version() + os_class = osa.get_os() + assert "os_name" == os_class.get_name() + assert "os_version" == os_class.get_version() + + +def xTrueOsDetect_last_of_many(): + osa = roslib.os_detect.OSDetect([FalseOs(), FalseOs(), FalseOs(), FalseOs(), TrueOs()]) + assert "os_name", osa.get_name() + assert "os_version", osa.get_version() + os_class = osa.get_os() + assert "os_name" == os_class.get_name() + assert "os_version" == os_class.get_version() + + +def xtest_ubuntu_in_OSA(): + ubuntu = roslib.os_detect.Ubuntu() + + def return_true(): + return True + ubuntu.check_presence = return_true + osa = roslib.os_detect.OSDetect([ubuntu]) + assert "ubuntu" == ubuntu.get_name() + os_class = osa.get_os() + assert "ubuntu" == os_class.get_name() diff --git a/test/test_rospkg_packages.py b/test/test_rospkg_packages.py new file mode 100644 index 0000000..ec2ff6f --- /dev/null +++ b/test/test_rospkg_packages.py @@ -0,0 +1,358 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function + +import os +import subprocess +import tempfile + + +def get_package_test_path(): + return os.path.abspath(os.path.join(os.path.dirname(__file__), 'package_tests')) + + +def test_ManifestManager_constructor(): + from rospkg import RosPack, RosStack, get_ros_paths + + r = RosPack() + assert r._manifest_name == 'manifest.xml' + r = RosStack() + assert r._manifest_name == 'stack.xml' + for c in [RosPack, RosStack]: + r = c() + assert r.ros_paths == get_ros_paths() + + tmp = tempfile.gettempdir() + + r = c(ros_paths=[tmp]) + assert r.ros_paths == [tmp] + # make sure we can't accidentally mutate the actual data + r.ros_paths.append('foo') + assert r.ros_paths == [tmp] + + +def test_ManifestManager_get_instance(): + from rospkg import RosPack, RosStack, get_ros_paths + + for c in [RosPack, RosStack]: + # make sure we get the same instance for defaults ros_paths + r1 = c.get_instance() + assert r1.ros_paths == get_ros_paths() + r2 = c.get_instance() + assert r1 is r2 + + # make sure we get the same instance for identical custom ros_paths + tmp = tempfile.gettempdir() + r3 = c.get_instance(ros_paths=[tmp]) + assert r3.ros_paths == [tmp] + r4 = c.get_instance(ros_paths=[tmp]) + assert r3 is r4 + + # make sure for different ros_paths we got different instances + assert r1 is not r3 + + +def rospackexec(args): + rospack_bin = 'rospack' + val = (subprocess.Popen([rospack_bin] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0] or '').strip() + if val.startswith('rospack:'): # rospack error message + raise Exception(val) + return val + + +# for comparing against 'ground truth' +def rospack_list(): + return [s.strip() for s in rospackexec(['list-names']).split('\n') if s.strip()] + + +def rospack_find(package): + return rospackexec(['find', package]).strip() + + +def rospack_depends(package): + return unicode(rospackexec(['depends', package])).split() + + +def rospack_depends1(package): + return unicode(rospackexec(['depends1', package])).split() + + +def rospack_depends_on(package): + return unicode(rospackexec(['depends-on', package])).split() + + +def rospack_depends_on1(package): + return unicode(rospackexec(['depends-on1', package])).split() + + +def delete_cache(): + from rospkg import get_ros_home + p = os.path.join(get_ros_home(), 'rospack_cache') + if os.path.exists(p): + os.remove(p) + + +def rospack_is_available(): + try: + rospackexec(['-h']) + return True + except: + return False + + +def test_RosPack_list(): + from rospkg import RosPack, get_ros_root + if get_ros_root() is not None and rospack_is_available(): + r = RosPack() + + pkgs = rospack_list() + retval = r.list() + assert set(pkgs) == set(retval), "%s vs %s" % (pkgs, retval) + + # test twice for caching + retval = r.list() + assert set(pkgs) == set(retval), "%s vs %s" % (pkgs, retval) + + # make sure stress test works with rospack_cache invalidated + delete_cache() + r = RosPack() + retval = r.list() + assert set(pkgs) == set(retval), "%s vs %s" % (pkgs, retval) + + +def test_RosPack_no_env(): + # regression test for #3680 + from rospkg import RosPack, ResourceNotFound + try: + environ_copy = os.environ.copy() + if 'ROS_ROOT' in os.environ: + del os.environ['ROS_ROOT'] + if 'ROS_PACKAGE_PATH' in os.environ: + del os.environ['ROS_PACKAGE_PATH'] + r = RosPack() + try: + r.get_depends('roscpp') + assert False, "should have raised" + except ResourceNotFound: + pass + finally: + os.environ.clear() + os.environ.update(environ_copy) + + +def test_RosPack_get_path(): + from rospkg import RosPack, ResourceNotFound, get_ros_root + + path = get_package_test_path() + foo_path = os.path.join(path, 'p1', 'foo') + foo_path_alt = os.path.join(path, 'p2', 'foo') + bar_path = os.path.join(path, 'p1', 'bar') + baz_path = os.path.join(path, 'p2', 'baz') + + # point ROS_ROOT at top, should spider entire tree + print("ROS path: %s" % (path)) + r = RosPack(ros_paths=[path]) + # precedence in this case is undefined as there are two 'foo's in the same path + assert r.get_path('foo') in [foo_path, foo_path_alt] + assert bar_path == r.get_path('bar') + assert baz_path == r.get_path('baz') + try: + r.get_path('fake') + assert False + except ResourceNotFound: + pass + + # divide tree in half to test precedence + print("ROS_PATH 1: %s" % (os.path.join(path, 'p1'))) + print("ROS_PATH 2: %s" % (os.path.join(path, 'p2'))) + r = RosPack(ros_paths=[os.path.join(path, 'p1'), os.path.join(path, 'p2')]) + assert foo_path == r.get_path('foo'), "%s vs. %s" % (foo_path, r.get_path('foo')) + assert bar_path == r.get_path('bar') + assert baz_path == r.get_path('baz') + + if get_ros_root() and rospack_is_available(): + # stresstest against rospack + r = RosPack() + for p in rospack_list(): + retval = r.get_path(p) + rospackval = rospack_find(p) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) + + +def test_RosPackage_get_depends(): + from rospkg import RosPack, ResourceNotFound, get_ros_root + path = get_package_test_path() + r = RosPack(ros_paths=[path]) + + # test on multiple calls to bad package -- there was an ordering + # issue in the logic that caused get_depends() to return an empty + # set on the second call. + for i in range(1, 4): + try: + r.get_depends('bad', implicit=True) + assert False, "should have raised" + except ResourceNotFound: + pass + + # TODO: need one more step + assert set(r.get_depends('baz')) == set(['foo', 'bar']) + assert r.get_depends('bar') == ['foo'] + assert r.get_depends('foo') == [] + + if get_ros_root() and rospack_is_available(): + # stress test: test default environment against rospack + r = RosPack() + for p in rospack_list(): + retval = set(r.get_depends(p)) + rospackval = set(rospack_depends(p)) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) + + +def get_stack_test_path(): + return os.path.abspath(os.path.join(os.path.dirname(__file__), 'stack_tests')) + + +def test_stack_of(): + from rospkg import RosPack, ResourceNotFound + path = os.path.join(get_stack_test_path(), 's1') + r = RosPack(ros_paths=[path]) + + # test with actual stacks + assert r.stack_of('foo_pkg') == 'foo' + assert r.stack_of('foo_pkg_2') == 'foo' + assert r.stack_of('bar_pkg') == 'bar' + + try: + r.stack_of('fake') + assert False, "should have raised ResourceNotFound" + except ResourceNotFound: + pass + + path = os.path.join(get_package_test_path(), 'p1') + r = RosPack(ros_paths=[path]) + + # test with actual not stacked-packages + assert r.stack_of('foo') is None + + +def test_RosPackage_get_depends_explicit(): + from rospkg import RosPack, get_ros_root + path = get_package_test_path() + r = RosPack(ros_paths=[path]) + + implicit = False + assert set(r.get_depends('baz', implicit)) == set(['bar', 'foo']) + assert r.get_depends('bar', implicit) == ['foo'] + assert r.get_depends('foo', implicit) == [] + + if get_ros_root() and rospack_is_available(): + # stress test: test default environment against rospack + r = RosPack() + for p in rospack_list(): + retval = set(r.get_depends(p, implicit)) + rospackval = set(rospack_depends1(p)) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) + + +def test_RosPack_get_rosdeps(): + from rospkg import RosPack + + path = get_package_test_path() + r = RosPack(ros_paths=[os.path.join(path, 'p1'), os.path.join(path, 'p2')]) + + # repeat tests due to caching + assert set(['foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3']) == set(r.get_rosdeps('foo', implicit=True)), r.get_rosdeps('foo', implicit=True) + assert set(['foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3']) == set(r.get_rosdeps('foo', implicit=True)) + assert set(['foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3']) == set(r.get_rosdeps('foo', implicit=False)) + + assert set(['bar_rosdep1', 'bar_rosdep2']) == set(r.get_rosdeps('bar', implicit=False)) + assert set(['foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3', 'bar_rosdep1', 'bar_rosdep2']) == set(r.get_rosdeps('bar', implicit=True)) + assert set(['foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3', 'bar_rosdep1', 'bar_rosdep2']) == set(r.get_rosdeps('bar', implicit=True)) + assert set(['foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3', 'bar_rosdep1', 'bar_rosdep2']) == set(r.get_rosdeps('bar')) + + assert ['baz_rosdep1'] == r.get_rosdeps('baz', implicit=False) + assert set(['baz_rosdep1', 'foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3', 'bar_rosdep1', 'bar_rosdep2']) == set(r.get_rosdeps('baz')) + assert set(['baz_rosdep1', 'foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3', 'bar_rosdep1', 'bar_rosdep2']) == set(r.get_rosdeps('baz')) + + # create a brand new instance to test with brand new cache + r = RosPack(ros_paths=[os.path.join(path, 'p1'), os.path.join(path, 'p2')]) + assert set(['baz_rosdep1', 'foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3', 'bar_rosdep1', 'bar_rosdep2']) == set(r.get_rosdeps('baz')) + assert set(['baz_rosdep1', 'foo_rosdep1', 'foo_rosdep2', 'foo_rosdep3', 'bar_rosdep1', 'bar_rosdep2']) == set(r.get_rosdeps('baz')) + + +def test_get_package_name(): + from rospkg import __version__ + from rospkg import get_package_name + + # test dir is a subdirectory of this package + test_dir = get_package_test_path() + assert get_package_name(test_dir) in ['rospkg', 'rospkg-%s' % __version__], get_package_name(test_dir) + + test_dir_foo = os.path.join(test_dir, 'p1', 'foo') + assert 'foo' == get_package_name(test_dir_foo) + + # test with path outside of our hierarchy + assert get_package_name(tempfile.tempdir) is None + + +def test_get_depends_on(): + from rospkg import RosPack, get_ros_root + test_dir = get_package_test_path() + rp = RosPack(ros_paths=[test_dir]) + # test direct depends + val = rp.get_depends_on('foo', implicit=False) + assert set(['bar', 'baz']) == set(val), val + val = rp.get_depends_on('bar', implicit=False) + assert ['baz'] == val, val + val = rp.get_depends_on('baz', implicit=False) + assert [] == val, val + + # test implicit depends + val = rp.get_depends_on('foo', implicit=True) + assert set(['bar', 'baz']) == set(val), val + val = rp.get_depends_on('bar', implicit=True) + assert ['baz'] == val, val + val = rp.get_depends_on('baz', implicit=True) + assert [] == val, val + + if get_ros_root() and rospack_is_available(): + # stress test: test default environment against rospack + r = RosPack() + for p in rospack_list(): + retval = set(r.get_depends_on(p, False)) + rospackval = set(rospack_depends_on1(p)) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) + for p in rospack_list(): + retval = set(r.get_depends_on(p, True)) + rospackval = set(rospack_depends_on(p)) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) diff --git a/test/test_rospkg_stacks.py b/test/test_rospkg_stacks.py new file mode 100644 index 0000000..ebb5b36 --- /dev/null +++ b/test/test_rospkg_stacks.py @@ -0,0 +1,292 @@ +# Software License Agreement (BSD License) +# +# Copyright (c) 2011, Willow Garage, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Willow Garage, Inc. nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from __future__ import print_function + +import os + +import subprocess + + +def get_stack_test_path(): + return os.path.abspath(os.path.join(os.path.dirname(__file__), 'stack_tests')) + + +def get_unary_test_path(): + return os.path.abspath(os.path.join(os.path.dirname(__file__), 'unary_tests')) + + +def rosstackexec(args): + rosstack_bin = 'rosstack' + val = subprocess.Popen([rosstack_bin] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=os.environ).communicate() + val = val[0].strip() + if val.startswith('rosstack:'): # rosstack error message + raise Exception(val) + return val + + +def rosstack_is_available(): + try: + rosstackexec(['-h']) + return True + except: + return False + + +# for comparing against 'ground truth' +def rosstack_list(): + return [s.strip() for s in rosstackexec(['list-names']).split('\n') if s.strip()] + + +def rosstack_find(package): + return rosstackexec(['find', package]).strip() + + +def rosstack_depends(package): + return unicode(rosstackexec(['depends', package])).split() + + +def rosstack_depends1(package): + return unicode(rosstackexec(['depends1', package])).split() + + +def delete_cache(): + from rospkg import get_ros_home + p = os.path.join(get_ros_home(), 'rosstack_cache') + if os.path.exists(p): + os.remove(p) + + +def test_RosStack_list(): + from rospkg import get_ros_paths, RosStack + + print("ROS paths", get_ros_paths()) + if get_ros_paths() is not None and rosstack_is_available(): + r = RosStack() + + l = rosstack_list() + retval = r.list() + assert set(l) == set(retval), "%s vs %s" % (l, retval) + + # test twice for caching + retval = r.list() + assert set(l) == set(retval), "%s vs %s" % (l, retval) + + # make sure stress test works with rospack_cache invalidated + delete_cache() + r = RosStack() + retval = r.list() + assert set(l) == set(retval), "%s vs %s" % (l, retval) + + +def test_RosStack_get_path(): + from rospkg import RosStack, ResourceNotFound, get_ros_paths + + path = get_stack_test_path() + bar_path = os.path.join(path, 's1', 'bar') + baz_path = os.path.join(path, 's2', 'baz') + + # point ROS_ROOT at top, should spider entire tree + print("ROS_PATHS: %s" % str([path])) + print("ROS_PACKAGE_PATH: ") + r = RosStack(ros_paths=[path]) + assert bar_path == r.get_path('bar'), "%s vs. %s" % (bar_path, r.get_path('bar')) + try: + r.get_path('fake') + assert False + except ResourceNotFound: + pass + + # divide tree in half to test precedence + print("ROS PATH 1: %s" % (os.path.join(path, 'p1'))) + print("ROS PATH 2: %s" % (os.path.join(path, 'p2'))) + foo_path = os.path.join(path, 's1', 'foo') + r = RosStack(ros_paths=[os.path.join(path, 's1'), os.path.join(path, 's2')]) + assert foo_path == r.get_path('foo'), "%s vs. %s" % (foo_path, r.get_path('foo')) + assert bar_path == r.get_path('bar') + assert baz_path == r.get_path('baz') + + # divide tree in half again and test precedence of ROS_PACKAGE_PATH (foo should switch) + print("ROS_ROOT: %s" % (os.path.join(path, 'p1'))) + print("ROS_PACKAGE_PATH: %s" % (os.path.join(path, 'p2'))) + foo_path = os.path.join(path, 's2', 'foo') + ros_paths = [os.path.join(path, 'notapath'), os.path.join(path, 's2'), os.path.join(path, 's1')] + r = RosStack(ros_paths=ros_paths) + assert foo_path == r.get_path('foo'), "%s vs. %s" % (foo_path, r.get_path('foo')) + + if get_ros_paths() and rosstack_is_available(): + # stresstest against rospack + r = RosStack() + listval = rosstack_list() + for p in listval: + retval = r.get_path(p) + rospackval = rosstack_find(p) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) + + # stresstest with cache invalidated + delete_cache() + r = RosStack() + for p in listval: + retval = r.get_path(p) + rospackval = rosstack_find(p) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) + + +def test_RosStack_get_depends(): + from rospkg import get_ros_paths, RosStack + path = get_stack_test_path() + s1 = os.path.join(path, 's1') + s3 = os.path.join(path, 's3') + r = RosStack(ros_paths=[s1, s3]) + + # TODO: need one more step + assert set(r.get_depends('baz')) == set(['foo', 'bar']) + assert r.get_depends('bar') == ['foo'] + assert r.get_depends('foo') == [] + + if get_ros_paths() and rosstack_is_available(): + # stress test: test default environment against rosstack + r = RosStack() + for p in rosstack_list(): + retval = set(r.get_depends(p)) + rospackval = set(rosstack_depends(p)) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) + + +def test_RosStack_get_depends_explicit(): + from rospkg import get_ros_paths, RosStack + path = get_stack_test_path() + s1 = os.path.join(path, 's1') + s3 = os.path.join(path, 's3') + r = RosStack(ros_paths=[s1, s3]) + + implicit = False + assert set(r.get_depends('baz', implicit)) == set(['bar', 'foo']) + assert r.get_depends('bar', implicit) == ['foo'] + assert r.get_depends('foo', implicit) == [] + + # stress test: test default environment against rospack + if get_ros_paths() and rosstack_is_available(): + r = RosStack() + for p in rosstack_list(): + retval = set(r.get_depends(p, implicit)) + rospackval = set(rosstack_depends1(p)) + assert retval == rospackval, "[%s]: %s vs. %s" % (p, retval, rospackval) + + +def test_expand_to_packages(): + from rospkg import expand_to_packages, RosPack, RosStack + path = os.path.join(get_stack_test_path(), 's1') + rospack = RosPack(ros_paths=[path]) + rosstack = RosStack(ros_paths=[path]) + + try: + expand_to_packages('foo', rospack, rosstack) + assert False, "should have raised ValueError" + except ValueError: + pass + + valid, invalid = expand_to_packages(['foo'], rospack, rosstack) + assert set(valid) == set(['foo_pkg', 'foo_pkg_2']) + assert not invalid + + valid, invalid = expand_to_packages(['foo_pkg'], rospack, rosstack) + assert set(valid) == set(['foo_pkg']) + assert not invalid + + valid, invalid = expand_to_packages(['foo', 'foo_pkg'], rospack, rosstack) + assert set(valid) == set(['foo_pkg', 'foo_pkg_2']) + assert not invalid + + valid, invalid = expand_to_packages(['foo', 'bar'], rospack, rosstack) + assert set(valid) == set(['foo_pkg', 'foo_pkg_2', 'bar_pkg']) + assert not invalid + + valid, invalid = expand_to_packages(['foo', 'bar_pkg'], rospack, rosstack) + assert set(valid) == set(['foo_pkg', 'foo_pkg_2', 'bar_pkg']) + assert not invalid + + valid, invalid = expand_to_packages(['foo', 'bar_pkg', 'bar'], rospack, rosstack) + assert set(valid) == set(['foo_pkg', 'foo_pkg_2', 'bar_pkg']) + assert not invalid + + valid, invalid = expand_to_packages(['foo', 'fake1', 'bar_pkg', 'bar', 'fake2'], rospack, rosstack) + assert set(valid) == set(['foo_pkg', 'foo_pkg_2', 'bar_pkg']) + assert set(invalid) == set(['fake1', 'fake2']) + + +def test_get_stack_version(): + from rospkg import get_stack_version_by_dir, RosStack + path = os.path.join(get_stack_test_path(), 's1') + r = RosStack(ros_paths=[path]) + + # test by dir option directly + foo_dir = r.get_path('foo') + assert get_stack_version_by_dir(foo_dir) == '1.6.0-manifest' + bar_dir = r.get_path('bar') + assert get_stack_version_by_dir(bar_dir) == '1.5.0-cmake' + + # test via rosstack + assert r.get_stack_version('foo') == '1.6.0-manifest' + assert r.get_stack_version('bar') == '1.5.0-cmake' + + path = os.path.join(get_stack_test_path(), 's2') + r = RosStack(ros_paths=[path]) + foo_dir = r.get_path('foo') + assert get_stack_version_by_dir(foo_dir) is None, get_stack_version_by_dir(foo_dir) + + # test reading from stack.yaml + baz_dir = r.get_path('baz') + assert get_stack_version_by_dir(baz_dir) == '1-manifest', get_stack_version_by_dir(baz_dir) + assert r.get_stack_version('baz') == '1-manifest' + + +def test_get_cmake_version(): + from rospkg.rospack import _get_cmake_version + + assert '1.6.0' == _get_cmake_version("rosbuild_make_distribution(1.6.0)") + try: + _get_cmake_version("rosbuild_make_distribution") + assert False, "should have raised ValueError" + except ValueError: + pass + + +def test_unary(): + from rospkg import RosStack, RosPack + path = get_unary_test_path() + rospack = RosPack(ros_paths=[path]) + rosstack = RosStack(ros_paths=[path]) + assert rospack.get_path('unary') == rosstack.get_path('unary') + + assert rosstack.packages_of('unary') == ['unary'] + assert rospack.stack_of('unary') == 'unary' -- 2.7.4