Try to fix bug#233, gerrit#894 is a hotfix for this issue in v0.9.
Create SafeURL object from repoN.url/user/passwd in config file,
this safe url is a string object, can be directly print to console.
When pass to build script, use its property "full" with user/passwd
info.
Change-Id: I667989e61a26a00a54255dd7c859e46553814f8f
import re
import subprocess
import tempfile
-import urllib2
import glob
import shutil
import base64
import pwd
-from urlparse import urlsplit, urlunsplit
from gitbuildsys import msger, utils, runner, errors
from gitbuildsys.conf import configmgr
+from gitbuildsys.safe_url import SafeURL
from gbp.scripts.buildpackage_rpm import main as gbp_build
from gbp.rpm.git import GitRepositoryError, RpmGitRepository
except (TypeError, IOError), err:
raise errors.ConfigError('Error decoding %s: %s' % \
(opt, err))
- repos[key]['passwd'] = urllib2.quote(value, safe='')
+ repos[key]['passwd'] = value
else:
repos[key][name] = value
if 'url' not in item:
raise errors.ConfigError("Url is not specified for %s" % key)
- splitted = urlsplit(item['url'])
- if splitted.username and item['user'] or \
- splitted.password and item['passwd']:
- raise errors.ConfigError("Auth info specified twice for %s" % key)
-
- # Get auth info from the url or urlx.user and urlx.pass
- user = item.get('user') or splitted.username
- passwd = item.get('passwd') or splitted.password
-
- if splitted.port:
- hostport = '%s:%d' % (splitted.hostname, splitted.port)
- else:
- hostport = splitted.hostname
-
- splitted_list = list(splitted)
- if user:
- if passwd:
- splitted_list[1] = '%s:%s@%s' % (urllib2.quote(user, safe=''),
- passwd, hostport)
- else:
- splitted_list[1] = '%s@%s' % (urllib2.quote(user, safe=''),
- hostport)
- elif passwd:
- raise errors.ConfigError('No user is specified for %s, '\
- 'only password' % key)
+ try:
+ url = SafeURL(item['url'], item.get('user'), item.get('passwd'))
+ except ValueError, e:
+ raise errors.ConfigError('%s for %s' % (str(e), key))
- result.append(urlunsplit(splitted_list))
+ result.append(url)
return result
-def clean_repos_userinfo(repos):
- striped_repos = []
- for repo in repos:
- splitted = urlsplit(repo)
- if not splitted.username:
- striped_repos.append(repo)
- else:
- splitted_list = list(splitted)
- if splitted.port:
- splitted_list[1] = '%s:%d' % (splitted.hostname, splitted.port)
- else:
- splitted_list[1] = splitted.hostname
- striped_repos.append(urlunsplit(splitted_list))
-
- return striped_repos
def do(opts, args):
repos = get_repos_conf()
if opts.repositories:
- repos.extend(opts.repositories)
+ repos.extend([ SafeURL(repo) for repo in opts.repositories ])
if not repos:
msger.error('No package repository specified.')
if not repourls:
msger.error('no repositories found for arch: %s under the '\
'following repos:\n %s' % \
- (buildarch, '\n'.join(clean_repos_userinfo(repos))))
+ (buildarch, '\n'.join(repos)))
for url in repourls:
if not re.match('https?://.*', url) and \
not (url.startswith('/') and os.path.exists(url)):
msger.error("Invalid repo url: %s" % url)
- cmd += ['--repository=%s' % url]
+ cmd += ['--repository=%s' % url.full]
if opts.dist:
distconf = opts.dist
--- /dev/null
+# vim: ai ts=4 sts=4 et sw=4
+#
+# Copyright (c) 2012 Intel, Inc.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation; version 2 of the License
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc., 59
+# Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""
+This module provides a class SafeURL which can contain url/user/password read
+from config file, and hide plain user and password when it print to screen
+"""
+
+import urllib
+import urlparse
+
+
+class SafeURL(str):
+
+ '''SafeURL can hide user info when it's printed to console.
+ Use property full to get url with user info
+ '''
+
+ def __new__(cls, url, user=None, passwd=None):
+ safe_url, inline_user, inline_passwd = SafeURL._extract_userinfo(url)
+
+ inst = super(SafeURL, cls).__new__(cls, safe_url)
+
+ inst.user, inst.passwd = SafeURL._check_userinfo(inline_user,
+ inline_passwd,
+ user, passwd)
+ inst.components = urlparse.urlsplit(safe_url)
+ return inst
+
+ @property
+ def full(self):
+ '''return the full url with user and password'''
+ userinfo = self._get_userinfo()
+ hostport = self._get_hostport(self.components)
+
+ if userinfo:
+ login = '%s@%s' % (userinfo, hostport)
+ else:
+ login = hostport
+
+ new_components = list(self.components)
+ new_components[1] = login
+ return urlparse.urlunsplit(new_components)
+
+ def pathjoin(self, *args):
+ '''treat self as path and urljoin'''
+ new = urlparse.urljoin(self.rstrip('/') + '/', *args)
+ return SafeURL(new, self.user, self.passwd)
+
+ def urljoin(self, *args):
+ '''join by urlparse.urljoin'''
+ return SafeURL(urlparse.urljoin(self, *args), self.user, self.passwd)
+
+ def _get_userinfo(self):
+ '''return userinfo component of url'''
+ if not self.user:
+ return ''
+
+ escape = lambda raw: urllib.quote(raw, safe='')
+ return '%s:%s' % (escape(self.user), escape(self.passwd)) \
+ if self.passwd else escape(self.user)
+
+ @staticmethod
+ def _extract_userinfo(url):
+ '''strip inline user/password from url'''
+ results = urlparse.urlsplit(url)
+ hostport = SafeURL._get_hostport(results)
+
+ components = list(results)
+ components[1] = hostport
+ safe_url = urlparse.urlunsplit(components)
+
+ return safe_url, results.username, results.password
+
+ @staticmethod
+ def _get_hostport(components):
+ '''return hostport component from urlsplit result'''
+ if components.port:
+ return '%s:%d' % (components.hostname, components.port)
+ return components.hostname
+
+ @staticmethod
+ def _check_userinfo(user_inline, passwd_inline, user, passwd):
+ '''returns the valid user and passwd'''
+
+ if user_inline and user or passwd_inline and passwd:
+ raise ValueError('Auth info specified twice')
+
+ user = user or user_inline
+ passwd = passwd or passwd_inline
+
+ if not user and passwd:
+ raise ValueError('No user is specified only password')
+ return user, passwd
import tempfile
import shutil
import pycurl
-import urlparse
import hashlib
import signal
from collections import defaultdict
try:
curl.perform()
except pycurl.error, err:
+ msger.debug('fetching error:%s' % str(err))
errcode = err.args[0]
if errcode == pycurl.E_OPERATION_TIMEOUTED:
raise errors.UrlError('timeout on %s: %s' % (curl.url, err))
def grab(self, url, filename, user=None, passwd=None):
'''grab url to filename'''
+ msger.debug("fetching %s => %s" % (url, filename))
+
with open(filename, 'w') as outfile:
self.change_url(url, outfile, user, passwd)
self.perform()
for arch in archs:
for repo in repos:
- repourl = os.path.join(baseurl, 'repos', repo, arch, 'packages')
+ repourl = baseurl.pathjoin('repos/%s/%s/packages' % (repo,
+ arch))
if self.is_standard_repo(repourl):
self.repourls[arch].append(repourl)
fname = os.path.join(self.cachedir, os.path.basename(url))
try:
- self.urlgrabber.grab(url, fname)
+ self.urlgrabber.grab(url, fname, url.user, url.passwd)
except errors.UrlError:
return
def is_standard_repo(self, repo):
'''Check if repo is standard repo with repodata/repomd.xml exist'''
- repomd_url = os.path.join(repo, 'repodata/repomd.xml')
+ repomd_url = repo.pathjoin('repodata/repomd.xml')
return not not self.fetch(repomd_url)
def parse(self, remotes):
if self.buildconf:
continue
- buildxml_url = urlparse.urljoin(repo.rstrip('/') + '/',
- '../../../../builddata/build.xml')
+ buildxml_url = repo.pathjoin('../../../../builddata/build.xml')
buildmeta = self.fetch(buildxml_url)
if not buildmeta:
continue
build_conf = self.get_buildconf(buildmeta)
- buildconf_url = buildxml_url.replace(os.path.basename \
- (buildxml_url), build_conf)
+ buildconf_url = buildxml_url.urljoin(build_conf)
fname = self.fetch(buildconf_url)
if fname:
self.buildconf = fname
continue
# Check if it's repo with builddata/build.xml exist
- buildxml_url = os.path.join(repo, 'builddata/build.xml')
+ buildxml_url = repo.pathjoin('builddata/build.xml')
buildmeta = self.fetch(buildxml_url)
if not buildmeta:
continue
continue
build_conf = self.get_buildconf(buildmeta)
- buildconf_url = urlparse.urljoin(repo.rstrip('/') + '/', \
- 'builddata/%s' % build_conf)
+ buildconf_url = repo.pathjoin('builddata/%s' % build_conf)
fname = self.fetch(buildconf_url)
if fname:
--- /dev/null
+#!/usr/bin/python -tt
+# vim: ai ts=4 sts=4 et sw=4
+#
+# Copyright (c) 2012 Intel, Inc.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation; version 2 of the License
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+# for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc., 59
+# Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+
+"""Unit tests for class SafeURL"""
+
+import unittest
+
+from gitbuildsys.safe_url import SafeURL
+
+
+class SafeURLTest(unittest.TestCase):
+ '''Test SafeURL class'''
+
+ def test_duplicated_user(self):
+ '''raise ValueError if specify user twice'''
+ self.assertRaises(ValueError, SafeURL, 'http://Alice@server', 'Bob')
+
+ def test_duplicated_password(self):
+ '''raise ValueError if specify passwd twice'''
+ self.assertRaises(ValueError, SafeURL,
+ 'http://Alice:pp@server', None, 'password')
+
+ def test_passwd_no_user(self):
+ '''raise ValueError if only given password'''
+ self.assertRaises(ValueError, SafeURL, 'http://:password@server')
+
+ def test_password_no_user2(self):
+ '''raise ValueError if only given password'''
+ self.assertRaises(ValueError, SafeURL, 'http://server', None, 'passwd')
+
+ def test_both_user_and_password(self):
+ '''both user and passwd are given'''
+ url = SafeURL('http://server', 'Alice', 'password')
+
+ self.assertEqual('http://server', url)
+ self.assertEqual('http://Alice:password@server', url.full)
+
+ def test_only_user_no_password(self):
+ '''only user no password'''
+ url = SafeURL('http://Alice@server')
+
+ self.assertEqual('http://server', url)
+ self.assertEqual('http://Alice@server', url.full)
+
+ def test_no_user_and_no_password(self):
+ '''no user and no passwd'''
+ url = SafeURL('http://server')
+
+ self.assertEqual('http://server', url)
+ self.assertEqual(url, url.full)
+
+ def test_port(self):
+ '''port given'''
+ url = SafeURL('http://Alice:password@server:8080')
+
+ self.assertEqual('http://server:8080', str(url))
+ self.assertEqual('http://Alice:password@server:8080', url.full)
+
+ def test_escape_userinfo(self):
+ '''user and passwd should be escape'''
+ url = SafeURL('http://server', 'Alice', 'a;/?:@&=+$,b')
+
+ self.assertEqual('http://Alice:a%3B%2F%3F%3A%40%26%3D%2B%24%2Cb@server',
+ url.full)
+
+ def test_join_double_dot(self):
+ '''reduce double dot in path'''
+ url = SafeURL('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest/repos/non-oss/armv7l/packages')
+
+ self.assertEqual('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest/builddata/build.xml',
+ url.pathjoin('../../../../builddata/build.xml'))
+
+ def test_join_build_conf(self):
+ '''get build conf'''
+ url = SafeURL('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest')
+
+ self.assertEqual('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest/builddata/test.conf',
+ url.pathjoin('builddata/test.conf'))
+
+ def test_join_build_conf2(self):
+ '''get build conf'''
+ url = SafeURL('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest/builddata/build.xml')
+
+ self.assertEqual('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest/builddata/test.conf',
+ url.urljoin('test.conf'))
+
+ def test_join_build_xml(self):
+ '''get build xml'''
+ url = SafeURL('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest')
+
+ self.assertEqual('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest/builddata/build.xml',
+ url.pathjoin('builddata/build.xml'))
+
+ def test_join_repo(self):
+ '''get repo and arch repo'''
+ url = SafeURL('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest')
+
+ self.assertEqual('https://download.tz.otcshare.org/snapshots/trunk/'
+ 'common/latest/repos/non-oss/armv7l/packages',
+ url.pathjoin('repos/non-oss/armv7l/packages'))
+
+ def test_local_path(self):
+ '''local path should not change'''
+ url = SafeURL('/local/path')
+
+ self.assertEqual('/local/path', url)
+ self.assertEqual(url, url.full)
\ No newline at end of file