from __future__ import with_statement
import os
-import re
import ast
import base64
from ConfigParser import SafeConfigParser, NoSectionError, NoOptionError, \
from gitbuildsys import msger, errors
from gitbuildsys.safe_url import SafeURL
+from gitbuildsys.utils import Temp
def decode_passwdx(passwdx):
return [ evalute_string(i.strip()) for i in string.split(sep, maxsplit) ]
-class SectionPattern(object):
- '''Pattern of section that support [section "name"] and [section].
- 1. If there is white-space in section header, it must obey the format like:
- section_type white_spaces section_name,
- section_name could be any string.
- 2. otherwise section name is the whole string in brackets
- '''
-
- SECTCRE = re.compile(
- r'\[' # [
- r'(?P<header>[^] \t]+)' # section name without any white-space
- r'([ \t]+' # or
- r'(?P<name>[^]]+)' # section type and section name
- r')?' # this section name is optional
- r'\]' # ]
- )
-
- class MatchObject(object):
- '''Match object for SectionPattern'''
-
- def __init__(self, match):
- self.match = match
-
- def group(self, _group1):
- '''return a tuple(type, name) if section has a name,
- otherwise return a string as section name
- '''
- type_ = self.match.group('header')
- name = self.match.group('name')
- if not name:
- return type_
-
- name = evalute_string(name)
- return type_, name
-
- def match(self, string):
- '''return MatchObject if string match the pattern'''
- match = self.SECTCRE.match(string)
- return self.MatchObject(match) if match else match
-
-
class BrainConfigParser(SafeConfigParser):
"""Standard ConfigParser derived class which can reserve most of the
comments, indents, and other user customized stuff inside the ini file.
"""
- SECTCRE = SectionPattern()
-
def read_one(self, filename):
"""only support one input file"""
return SafeConfigParser.read(self, filename)
else:
raise NoSectionError(section)
- def set(self, section, option, value, replace_opt=None):
+ def set_into_file(self, section, option, value, replace_opt=None):
"""When set new value, need to update the readin file lines,
which can be saved back to file later.
"""
if self._fpname == '<???>':
return
- fptr = open(self._fpname, 'w')
- for line in self._flines:
- if line is not None:
- fptr.write(line)
- fptr.close()
+ with open(self._fpname, 'w') as fptr:
+ buf = ''.join([ line for line in self._flines if line is not None ])
+ fptr.write(buf)
+
class ConfigMgr(object):
'''Support multi-levels of gbs.conf. Use this class to get and set
'build_server': 'https://api.tizen.org',
'user': '',
'passwd': '',
- 'base_prj': 'Tizen:Main',
- 'target_prj': ''
},
'build': {
'build_cmd': '/usr/bin/build',
},
}
- DEFAULT_CONF_TEMPLATE = """[general]
-; general settings
-tmpdir = $general__tmpdir
-editor = $general__editor
+ DEFAULT_CONF_TEMPLATE = '''[general]
+profile = profile.tizen
+tmpdir = /var/tmp
-[remotebuild]
-; settings for build subcommand
-build_server = $remotebuild__build_server
-user = $remotebuild__user
+[profile.tizen]
+; common authentication info for whole profile
+#user =
; CAUTION: please use the key name "passwd" to reset plaintext password
-passwdx = $remotebuild__passwdx
-; Default base project
-base_prj = $remotebuild__base_prj
-; Default target project
-target_prj = $remotebuild__target_prj
-
-[build]
-build_cmd = $build__build_cmd
-distconf = $build__distconf
-
-; optional, repos definitions
-#repo1.url=
-#repo1.user=
-#repo1.passwd=
-; one more repo
-#repo2.url=
-#repo2.user=
-#repo2.passwd=
-"""
+#passwd =
+obs = obs.tizen
+; comma separated list of repositories
+repos = repo.tizen_latest
+distconf = /usr/share/gbs/tizen-1.0.conf
+
+[obs.tizen]
+url = https://api.tizen.org
+; optinal user/passwd, set if differ from proflie's user/passwd
+#user =
+#passwd =
+
+[repo.tizen_latest]
+url = http://download.tizen.org/snapshots/trunk/common/latest/
+; optinal user/passwd, set if differ from proflie's user/passwd
+#user =
+#passwdx =
+'''
# make the manager class as singleton
_instance = None
# use the default path
fpaths = self._lookfor_confs()
if not fpaths:
- if not self._new_conf():
- msger.error('No config file available')
+ self._new_conf()
+ fpaths = self._lookfor_confs()
+ self._cfgparsers = []
for fpath in fpaths:
cfgparser = BrainConfigParser()
try:
@staticmethod
def _lookfor_confs():
"""Look for available config files following the order:
- > Current git
- > Cwd
+ > Current project
> User
+ > System
"""
paths = []
- for path in (os.path.abspath('.git/gbs.conf'),
- os.path.abspath('.gbs.conf'),
- os.path.expanduser('~/.gbs.conf')):
+ for path in (os.path.abspath('.gbs.conf'),
+ os.path.expanduser('~/.gbs.conf'),
+ '/etc/gbs.conf'):
if os.path.exists(path) and path not in paths:
paths.append(path)
return paths
- def get_default_conf(self, defaults=None):
- 'returns ini template string contains default values'
- from string import Template
- if not defaults:
- defaults = self.DEFAULTS
-
- tmpl_keys = {}
- for sec, opts in defaults.iteritems():
- for opt, val in opts.iteritems():
- tmpl_keys['%s__%s' % (sec, opt)] = val
-
- return Template(self.DEFAULT_CONF_TEMPLATE).safe_substitute(tmpl_keys)
-
- def _new_conf(self, fpath=None):
- 'generate a new conf file located by fpath'
- if not fpath:
- fpath = os.path.expanduser('~/.gbs.conf')
-
- import getpass
- msger.info('Creating config file %s ... ' % fpath)
- # user and passwd in [build] section need user input
- defaults = self.DEFAULTS.copy()
- build_server = raw_input('Remote build server url (use %s by default):'\
- % defaults['remotebuild']['build_server'])
- if build_server:
- defaults['remotebuild']['build_server'] = build_server
-
- defaults['remotebuild']['user'] = \
- raw_input('Username for remote build server '\
- '(type <enter> to skip): ')
-
- if defaults['remotebuild']['user']:
- msger.info('Your password will be encoded before saving ...')
- defaults['remotebuild']['passwdx'] = \
- encode_passwd(getpass.getpass())
- else:
- defaults['remotebuild']['passwdx'] = \
- encode_passwd(defaults['remotebuild']['passwd'])
+ def _new_conf(self):
+ 'generate a default conf file in home dir'
+ fpath = os.path.expanduser('~/.gbs.conf')
with open(fpath, 'w') as wfile:
- wfile.write(self.get_default_conf(defaults))
+ wfile.write(self.DEFAULT_CONF_TEMPLATE)
os.chmod(fpath, 0600)
- msger.info('Done. Your gbs config is now located at %s' % fpath)
- msger.warning("Don't forget to double-check the config manually.")
- return True
+ msger.warning('Created a new config file %s. Please check and edit '
+ 'your authentication information.' % fpath)
def _check_passwd(self):
'convert passwd item to passwdx and then update origin conf files'
- replaced_keys = False
+ dirty = set()
all_sections = set()
for layer in self._cfgparsers:
if plainpass is None:
# empty string password is acceptable here
continue
- cfgparser.set(sec,
+ cfgparser.set_into_file(sec,
key + 'x',
encode_passwd(plainpass),
key)
- replaced_keys = True
+ dirty.add(cfgparser)
- if replaced_keys:
+ if dirty:
msger.warning('plaintext password in config files will '
'be replaced by encoded ones')
- self.update()
+ self.update(dirty)
def _get(self, opt, section='general'):
'get value from multi-levels of config file'
else:
return self._get(opt, section)
- def update(self):
+ def update(self, cfgparsers):
'update changed values into files on disk'
- for cfgparser in self._cfgparsers:
- cfgparser.update()
+ for cfgparser in cfgparsers:
+ try:
+ cfgparser.update()
+ except IOError, err:
+ msger.warning('update config file error: %s' % err)
+
+
+class OBSConf(object):
+ 'Config items related to obs section'
+
+ def __init__(self, parent, name, url, base, target):
+ self.parent = parent
+ self.name = name
+ self.url = url
+ self.base = base
+ self.target = target
+
+ def dump(self, fhandler):
+ 'dump ini to file object'
+ parser = BrainConfigParser()
+ parser.add_section(self.name)
+
+ parser.set(self.name, 'url', self.url)
+
+ if self.url.user and self.url.user != self.parent.common_user:
+ parser.set(self.name, 'user', self.url.user)
+
+ if self.url.passwd and self.url.passwd != self.parent.common_password:
+ parser.set(self.name, 'passwdx',
+ encode_passwd(self.url.passwd))
+
+ if self.base:
+ parser.set(self.name, 'base_prj', self.base)
+
+ if self.target:
+ parser.set(self.name, 'target_prj', self.target)
+ parser.write(fhandler)
+
+
+class RepoConf(object):
+ 'Config items related to repo section'
+
+ def __init__(self, parent, name, url):
+ self.parent = parent
+ self.name = name
+ self.url = url
+
+ def dump(self, fhandler):
+ 'dump ini to file object'
+ parser = BrainConfigParser()
+ parser.add_section(self.name)
+
+ parser.set(self.name, 'url', self.url)
+
+ if self.url.user and self.url.user != self.parent.common_user:
+ parser.set(self.name, 'user', self.url.user)
+
+ if self.url.passwd and self.url.passwd != self.parent.common_password:
+ parser.set(self.name, 'passwdx',
+ encode_passwd(self.url.passwd))
+ parser.write(fhandler)
class Profile(object):
'''Profile which contains all config values related to same domain'''
- def __init__(self, user, password):
+ def __init__(self, name, user, password):
+ self.name = name
self.common_user = user
self.common_password = password
self.repos = []
- self.api = None
+ self.obs = None
- def make_url(self, url, user, password):
- '''make a safe url which contains auth info'''
- user = user or self.common_user
- password = password or self.common_password
- try:
- return SafeURL(url, user, password)
- except ValueError, err:
- raise errors.ConfigError('%s for %s' % (str(err), url))
+ def _update_url(self, url):
+ 'update url by common auth info'
+ if not url.user:
+ url.user = self.common_user
+ if not url.passwd:
+ url.passwd = self.common_password
+ return url
- def add_repo(self, url, user, password):
+ def add_repo(self, repoconf):
'''add a repo to repo list of the profile'''
- self.repos.append(self.make_url(url, user, password))
+ self._update_url(repoconf.url)
+ self.repos.append(repoconf)
- def set_api(self, url, user, password):
+ def set_obs(self, obsconf):
'''set OBS api of the profile'''
- self.api = self.make_url(url, user, password)
+ self._update_url(obsconf.url)
+ self.obs = obsconf
+
+ def dump(self, fhandler):
+ 'dump ini to file object'
+ parser = BrainConfigParser()
+ parser.add_section(self.name)
- def get_repos(self):
- '''get repo list of the profile'''
- return self.repos
+ if self.common_user:
+ parser.set(self.name, 'user', self.common_user)
+ if self.common_password:
+ parser.set(self.name, 'passwdx',
+ encode_passwd(self.common_password))
- def get_api(self):
- '''get OBS api of the profile'''
- return self.api
+ if self.obs:
+ parser.set(self.name, 'obs', self.obs.name)
+ self.obs.dump(fhandler)
+
+ if self.repos:
+ names = []
+ for repo in self.repos:
+ names.append(repo.name)
+ repo.dump(fhandler)
+ parser.set(self.name, 'repos', ', '.join(names))
+ parser.write(fhandler)
class BizConfigManager(ConfigMgr):
if self.is_profile_oriented():
return self._build_profile_by_name(self.get('profile'))
- msger.warning('subcommand oriented style of config is deprecated, '
- 'please convert to profile oriented style.')
- return self._build_profile_by_subcommand()
+ profile = self._build_profile_by_subcommand()
+ self.convert_to_new_style(profile)
+ return profile
+
+ def convert_to_new_style(self, profile):
+ 'convert ~/.gbs.conf to new style'
+ def dump_general(fhandler):
+ 'dump options in general section'
+ parser = BrainConfigParser()
+ parser.add_section('general')
+ parser.set('general', 'profile', profile.name)
+
+ for opt in self.options('general'):
+ val = self.get(opt)
+ if val != self.DEFAULTS['general'].get(opt):
+ parser.set('general', opt, val)
+ parser.write(fhandler)
+
+ fname = '~/.gbs.conf.template'
+ try:
+ tmp = Temp()
+ with open(tmp.path, 'w') as fhandler:
+ dump_general(fhandler)
+ profile.dump(fhandler)
+ os.rename(tmp.path, os.path.expanduser(fname))
+ except IOError, err:
+ raise errors.ConfigError(err)
+
+ msger.warning('subcommand oriented style of config is deprecated. '
+ 'Please check %s, a new profile oriented style of config which'
+ ' was converted from your current settings.' % fname)
def get_optional_item(self, section, option, default=None):
'''return default if section.option does not exist'''
except errors.ConfigError:
return default
- def _get_url_section(self, section_id):
+ def _get_url_options(self, section_id):
'''get url/user/passwd from a section'''
url = self.get('url', section_id)
user = self.get_optional_item(section_id, 'user')
password = self.get_optional_item(section_id, 'passwd')
- return url, user, password
+ try:
+ return SafeURL(url, user, password)
+ except ValueError, err:
+ raise errors.ConfigError('%s for %s' % (str(err), url))
def _build_profile_by_name(self, name):
'''return profile object by a given section'''
- profile_id = ('profile', name)
- user = self.get_optional_item(profile_id, 'user')
- password = self.get_optional_item(profile_id, 'passwd')
-
- profile = Profile(user, password)
-
- conf_api = self.get_optional_item(profile_id, 'api')
- if conf_api:
- api = self.get('api', profile_id)
- api_id = ('obs', api)
- profile.set_api(*self._get_url_section(api_id))
-
- conf_repos = self.get_optional_item(profile_id, 'repos')
- if conf_repos:
- repos = split_and_evaluate_string(conf_repos, ',')
- for repo in repos:
- repo_id = ('repo', repo)
- profile.add_repo(*self._get_url_section(repo_id))
+ if not name.startswith('profile.'):
+ raise msger.error('profile section name must start '
+ 'with "profile.": %s' % name)
+
+ user = self.get_optional_item(name, 'user')
+ password = self.get_optional_item(name, 'passwd')
+
+ profile = Profile(name, user, password)
+
+ obs = self.get_optional_item(name, 'obs')
+ if obs:
+ if not obs.startswith('obs.'):
+ msger.error('obs section name should start '
+ 'with "obs.": %s' % obs)
+
+ obsconf = OBSConf(profile, obs,
+ self._get_url_options(obs),
+ self.get_optional_item(obs, 'base_prj'),
+ self.get_optional_item(obs, 'target_prj'))
+ profile.set_obs(obsconf)
+
+ repos = self.get_optional_item(name, 'repos')
+ if repos:
+ for repo in repos.split(','):
+ repo = repo.strip()
+ if not repo.startswith('repo.'):
+ msger.warning('repo section name should start '
+ 'with "repo.": %s' % repo)
+ continue
+
+ repoconf = RepoConf(profile, repo,
+ self._get_url_options(repo))
+ profile.add_repo(repoconf)
return profile
def _build_profile_by_subcommand(self):
'''return profile object from subcommand oriented style of config'''
- profile = Profile(None, None)
+ profile = Profile('profile.current', None, None)
- section_id = 'remotebuild'
- url = self.get('build_server', section_id)
- user = self.get_optional_item(section_id, 'user')
- password = self.get_optional_item(section_id, 'passwd')
- profile.set_api(url, user, password)
+ sec = 'remotebuild'
+ addr = self.get('build_server', sec)
+ user = self.get_optional_item(sec, 'user')
+ password = self.get_optional_item(sec, 'passwd')
+
+ try:
+ url = SafeURL(addr, user, password)
+ except ValueError, err:
+ raise errors.ConfigError('%s for %s' % (str(err), addr))
+
+ obsconf = OBSConf(profile, 'obs.%s' % sec, url,
+ self.get_optional_item('remotebuild', 'base_prj'),
+ self.get_optional_item('remotebuild', 'target_prj'))
+ profile.set_obs(obsconf)
repos = self._parse_build_repos()
for key, item in repos:
if 'url' not in item:
- raise errors.ConfigError("Url is not specified for %s" % key)
- profile.add_repo(item['url'], item.get('user'), item.get('passwd'))
+ raise errors.ConfigError("URL is not specified for %s" % key)
+ try:
+ url = SafeURL(item['url'], item.get('user'), item.get('passwd'))
+ except ValueError, err:
+ raise errors.ConfigError('%s for %s' % (str(err), item['url']))
+
+ repoconf = RepoConf(profile, 'repo.%s' % key, url)
+ profile.add_repo(repoconf)
return profile
"""Functional tests for profile style of config"""
import unittest
+from mock import patch, MagicMock, Mock
+
import gitbuildsys.conf
from test_config import Fixture
+from test_passwdx import FakeFile
def get_profile():
@Fixture(home='profile.ini')
def test_profile_api(self):
'test get obs api'
- self.assertEquals('https://api.tz/path', get_profile().get_api())
+ self.assertEquals('https://api.tz/path', get_profile().obs.url)
@Fixture(home='profile.ini')
def test_api_inherit_auth(self):
'test api can inherit auto from parent profile section'
self.assertEquals('https://Alice:secret@api.tz/path',
- get_profile().get_api().full)
+ get_profile().obs.url.full)
@Fixture(home='profile_only_has_api.ini')
def test_api_auth_can_be_overwrite(self):
'test api auth can be overwrite'
self.assertEquals('https://Bob:classified@api.tz/path',
- get_profile().get_api().full)
+ get_profile().obs.url.full)
@Fixture(home='profile.ini')
def test_profile_repos_in_order(self):
'https://repo/ia32/non-oss',
'https://repo/ia32/base',
'/local/path'],
- get_profile().get_repos())
+ [i.url for i in get_profile().repos])
@Fixture(home='profile.ini')
def test_repo_inherit_auth(self):
'test repo can inherit auth from parent section'
self.assertEquals('https://Alice:secret@repo/ia32/main',
- get_profile().get_repos()[0].full)
+ get_profile().repos[0].url.full)
@Fixture(home='profile.ini')
def test_repo_overwrite_auth(self):
'test repo auth can be overwrite'
self.assertEquals('https://Bob:classified@repo/ia32/base',
- get_profile().get_repos()[2].full)
+ get_profile().repos[2].url.full)
@Fixture(home='no_such_profile_section_name.ini')
def test_no_such_profile(self):
'test get a empty profile when name does not exist'
profile = get_profile()
- self.assertEquals(None, profile.get_api())
- self.assertEquals([], profile.get_repos())
+ self.assertEquals(None, profile.obs)
+ self.assertEquals([], profile.repos)
@Fixture(home='profile.ini')
def test_local_repo_need_not_auth(self):
'''test local path needn't auth info'''
- self.assertEquals('/local/path', get_profile().get_repos()[3].full)
+ self.assertEquals('/local/path', get_profile().repos[3].url.full)
+
+ @Fixture(home='profile.ini')
+ def test_obs_base_project(self):
+ 'test read base project from conf'
+ self.assertEquals('base', get_profile().obs.base)
+
+ @Fixture(home='profile.ini')
+ def test_obs_target_project(self):
+ 'test read target project from conf'
+ self.assertEquals('target', get_profile().obs.target)
+@patch('gitbuildsys.conf.open', MagicMock(), create=True)
+@patch('gitbuildsys.conf.os.rename', Mock())
class SubcommandStyleTest(unittest.TestCase):
'''test for subcommand oriented config'''
@Fixture(home='subcommand.ini')
def test_api(self):
'test obs api'
- self.assertEquals('https://api/build/server', get_profile().get_api())
+ self.assertEquals('https://api/build/server', get_profile().obs.url)
@Fixture(home='subcommand.ini')
def test_api_auth(self):
'test api auth'
self.assertEquals('https://Alice:secret@api/build/server',
- get_profile().get_api().full)
+ get_profile().obs.url.full)
@Fixture(home='subcommand.ini')
def test_repos_in_order(self):
self.assertEquals(['https://repo1/path',
'https://repo2/path',
'/local/path/repo'],
- get_profile().get_repos())
+ [i.url for i in get_profile().repos])
@Fixture(home='subcommand.ini')
def test_repo_auth(self):
'test repo auth'
self.assertEquals('https://Alice:secret@repo1/path',
- get_profile().get_repos()[0].full)
+ get_profile().repos[0].url.full)
+
+
+
+@patch('gitbuildsys.conf.open', create=True)
+class ConvertTest(unittest.TestCase):
+ 'Test convert subcommand to profile'
+
+ @Fixture(home='subcommand.ini')
+ def test_convert(self, fake_open):
+ 'test convert'
+ conf = FakeFile()
+ fake_open.return_value = conf
+
+ get_profile()
+
+ self.assertEquals(conf.getvalue(), '''[general]
+profile = profile.current
+
+[obs.remotebuild]
+url = https://api/build/server
+user = Alice
+passwdx = QlpoOTFBWSZTWYfNdxYAAAIBgAoAHAAgADDNAMNEA24u5IpwoSEPmu4s
+base_prj = Main
+target_prj = Target
+
+[repo.repo1]
+url = https://repo1/path
+user = Alice
+passwdx = QlpoOTFBWSZTWYfNdxYAAAIBgAoAHAAgADDNAMNEA24u5IpwoSEPmu4s
+
+[repo.repo2]
+url = https://repo2/path
+user = Alice
+passwdx = QlpoOTFBWSZTWYfNdxYAAAIBgAoAHAAgADDNAMNEA24u5IpwoSEPmu4s
+
+[repo.repo3]
+url = /local/path/repo
+
+[profile.current]
+obs = obs.remotebuild
+repos = repo.repo1, repo.repo2, repo.repo3
+
+''')
+
+
+
if __name__ == '__main__':