-"""PyPI and direct package downloading"""
+"""PyPI and direct package downloading."""
+
import sys
import os
import re
import setuptools
from pkg_resources import (
- CHECKOUT_DIST, Distribution, BINARY_DIST, normalize_path, SOURCE_DIST,
- Environment, find_distributions, safe_name, safe_version,
- to_filename, Requirement, DEVELOP_DIST, EGG_DIST, parse_version,
+ CHECKOUT_DIST,
+ Distribution,
+ BINARY_DIST,
+ normalize_path,
+ SOURCE_DIST,
+ Environment,
+ find_distributions,
+ safe_name,
+ safe_version,
+ to_filename,
+ Requirement,
+ DEVELOP_DIST,
+ EGG_DIST,
+ parse_version,
)
from distutils import log
from distutils.errors import DistutilsError
EXTENSIONS = ".tar.gz .tar.bz2 .tar .zip .tgz".split()
__all__ = [
- 'PackageIndex', 'distros_for_url', 'parse_bdist_wininst',
+ 'PackageIndex',
+ 'distros_for_url',
+ 'parse_bdist_wininst',
'interpret_distro_name',
]
_tmpl = "setuptools/{setuptools.__version__} Python-urllib/{py_major}"
user_agent = _tmpl.format(
- py_major='{}.{}'.format(*sys.version_info), setuptools=setuptools)
+ py_major='{}.{}'.format(*sys.version_info), setuptools=setuptools
+)
def parse_requirement_arg(spec):
wheel = Wheel(basename)
if not wheel.is_compatible():
return []
- return [Distribution(
- location=location,
- project_name=wheel.project_name,
- version=wheel.version,
- # Increase priority over eggs.
- precedence=EGG_DIST + 1,
- )]
+ return [
+ Distribution(
+ location=location,
+ project_name=wheel.project_name,
+ version=wheel.version,
+ # Increase priority over eggs.
+ precedence=EGG_DIST + 1,
+ )
+ ]
if basename.endswith('.exe'):
win_base, py_ver, platform = parse_bdist_wininst(basename)
if win_base is not None:
#
for ext in EXTENSIONS:
if basename.endswith(ext):
- basename = basename[:-len(ext)]
+ basename = basename[: -len(ext)]
return interpret_distro_name(location, basename, metadata)
return [] # no extension matched
def interpret_distro_name(
- location, basename, metadata, py_version=None, precedence=SOURCE_DIST,
- platform=None
+ location, basename, metadata, py_version=None, precedence=SOURCE_DIST, platform=None
):
"""Generate alternative interpretations of a source distro name
for p in range(1, len(parts) + 1):
yield Distribution(
- location, metadata, '-'.join(parts[:p]), '-'.join(parts[p:]),
- py_version=py_version, precedence=precedence,
- platform=platform
+ location,
+ metadata,
+ '-'.join(parts[:p]),
+ '-'.join(parts[p:]),
+ py_version=py_version,
+ precedence=precedence,
+ platform=platform,
)
return wrapper
-REL = re.compile(r"""<([^>]*\srel\s*=\s*['"]?([^'">]+)[^>]*)>""", re.I)
-# this line is here to fix emacs' cruddy broken syntax highlighting
+REL = re.compile(r"""<([^>]*\srel\s{0,10}=\s{0,10}['"]?([^'" >]+)[^>]*)>""", re.I)
+"""
+Regex for an HTML tag with 'rel="val"' attributes.
+"""
@unique_values
"""A distribution index that scans web pages for download URLs"""
def __init__(
- self, index_url="https://pypi.org/simple/", hosts=('*',),
- ca_bundle=None, verify_ssl=True, *args, **kw
+ self,
+ index_url="https://pypi.org/simple/",
+ hosts=('*',),
+ ca_bundle=None,
+ verify_ssl=True,
+ *args,
+ **kw
):
super().__init__(*args, **kw)
- self.index_url = index_url + "/" [:not index_url.endswith('/')]
+ self.index_url = index_url + "/"[: not index_url.endswith('/')]
self.scanned_urls = {}
self.fetched_urls = {}
self.package_pages = {}
return True
msg = (
"\nNote: Bypassing %s (disallowed host; see "
- "http://bit.ly/2hrImnY for details).\n")
+ "http://bit.ly/2hrImnY for details).\n"
+ )
if fatal:
raise DistutilsError(msg % url)
else:
if not link.startswith(self.index_url):
return NO_MATCH_SENTINEL
- parts = list(map(
- urllib.parse.unquote, link[len(self.index_url):].split('/')
- ))
+ parts = list(map(urllib.parse.unquote, link[len(self.index_url) :].split('/')))
if len(parts) != 2 or '#' in parts[1]:
return NO_MATCH_SENTINEL
def need_version_info(self, url):
self.scan_all(
"Page at %s links to .py file(s) without version info; an index "
- "scan is required.", url
+ "scan is required.",
+ url,
)
def scan_all(self, msg=None, *args):
if self.index_url not in self.fetched_urls:
if msg:
self.warn(msg, *args)
- self.info(
- "Scanning index of all packages (this may take a while)"
- )
+ self.info("Scanning index of all packages (this may take a while)")
self.scan_url(self.index_url)
def find_packages(self, requirement):
"""
checker is a ContentChecker
"""
- checker.report(
- self.debug,
- "Validating %%s checksum for %s" % filename)
+ checker.report(self.debug, "Validating %%s checksum for %s" % filename)
if not checker.is_valid():
tfp.close()
os.unlink(filename)
else: # no distros seen for this name, might be misspelled
meth, msg = (
self.warn,
- "Couldn't find index page for %r (maybe misspelled?)")
+ "Couldn't find index page for %r (maybe misspelled?)",
+ )
meth(msg, requirement.unsafe_name)
self.scan_all()
return getattr(self.fetch_distribution(spec, tmpdir), 'location', None)
def fetch_distribution( # noqa: C901 # is too complex (14) # FIXME
- self, requirement, tmpdir, force_scan=False, source=False,
- develop_ok=False, local_index=None):
+ self,
+ requirement,
+ tmpdir,
+ force_scan=False,
+ source=False,
+ develop_ok=False,
+ local_index=None,
+ ):
"""Obtain a distribution suitable for fulfilling `requirement`
`requirement` must be a ``pkg_resources.Requirement`` instance.
if dist.precedence == DEVELOP_DIST and not develop_ok:
if dist not in skipped:
self.warn(
- "Skipping development or system egg: %s", dist,
+ "Skipping development or system egg: %s",
+ dist,
)
skipped[dist] = 1
continue
- test = (
- dist in req
- and (dist.precedence <= SOURCE_DIST or not source)
- )
+ test = dist in req and (dist.precedence <= SOURCE_DIST or not source)
if test:
loc = self.download(dist.location, tmpdir)
dist.download_location = loc
def gen_setup(self, filename, fragment, tmpdir):
match = EGG_FRAGMENT.match(fragment)
- dists = match and [
- d for d in
- interpret_distro_name(filename, match.group(1), None) if d.version
- ] or []
+ dists = (
+ match
+ and [
+ d
+ for d in interpret_distro_name(filename, match.group(1), None)
+ if d.version
+ ]
+ or []
+ )
if len(dists) == 1: # unambiguous ``#egg`` fragment
basename = os.path.basename(filename)
"from setuptools import setup\n"
"setup(name=%r, version=%r, py_modules=[%r])\n"
% (
- dists[0].project_name, dists[0].version,
- os.path.splitext(basename)[0]
+ dists[0].project_name,
+ dists[0].version,
+ os.path.splitext(basename)[0],
)
)
return filename
if warning:
self.warn(warning, v.reason)
else:
- raise DistutilsError("Download error for %s: %s"
- % (url, v.reason)) from v
+ raise DistutilsError(
+ "Download error for %s: %s" % (url, v.reason)
+ ) from v
except http.client.BadStatusLine as v:
if warning:
self.warn(warning, v.line)
else:
raise DistutilsError(
'%s returned a bad status line. The server might be '
- 'down, %s' %
- (url, v.line)
+ 'down, %s' % (url, v.line)
) from v
except (http.client.HTTPException, socket.error) as v:
if warning:
self.warn(warning, v)
else:
- raise DistutilsError("Download error for %s: %s"
- % (url, v)) from v
+ raise DistutilsError("Download error for %s: %s" % (url, v)) from v
def _download_url(self, scheme, url, tmpdir):
# Determine download filename
if rev is not None:
self.info("Checking out %s", rev)
- os.system("git -C %s checkout --quiet %s" % (
- filename,
- rev,
- ))
+ os.system(
+ "git -C %s checkout --quiet %s"
+ % (
+ filename,
+ rev,
+ )
+ )
return filename
if rev is not None:
self.info("Updating to %s", rev)
- os.system("hg --cwd %s up -C -r %s -q" % (
- filename,
- rev,
- ))
+ os.system(
+ "hg --cwd %s up -C -r %s -q"
+ % (
+ filename,
+ rev,
+ )
+ )
return filename
@property
def creds_by_repository(self):
sections_with_repositories = [
- section for section in self.sections()
+ section
+ for section in self.sections()
if self.get(section, 'repository').strip()
]
files.append('<a href="{name}">{name}</a>'.format(name=f))
else:
tmpl = (
- "<html><head><title>{url}</title>"
- "</head><body>{files}</body></html>")
+ "<html><head><title>{url}</title>" "</head><body>{files}</body></html>"
+ )
body = tmpl.format(url=url, files='\n'.join(files))
status, message = 200, "OK"
else:
import urllib.request
import urllib.error
import http.client
+from unittest import mock
-import mock
import pytest
import setuptools.package_index
<a href="http://some_url">Name</a>
(<a title="MD5 hash"
href="{hash_url}">md5</a>)
- """.lstrip().format(**locals())
+ """.lstrip().format(
+ **locals()
+ )
assert setuptools.package_index.PYPI_MD5.match(doc)
def test_bad_url_bad_port(self):
# issue 16
# easy_install inquant.contentmirror.plone breaks because of a typo
# in its home URL
- index = setuptools.package_index.PackageIndex(
- hosts=('www.example.com',)
- )
+ index = setuptools.package_index.PackageIndex(hosts=('www.example.com',))
url = (
'url:%20https://svn.plone.org/svn'
assert isinstance(v, urllib.error.HTTPError)
def test_bad_url_bad_status_line(self):
- index = setuptools.package_index.PackageIndex(
- hosts=('www.example.com',)
- )
+ index = setuptools.package_index.PackageIndex(hosts=('www.example.com',))
def _urlopen(*args):
raise http.client.BadStatusLine('line')
"""
A bad URL with a double scheme should raise a DistutilsError.
"""
- index = setuptools.package_index.PackageIndex(
- hosts=('www.example.com',)
- )
+ index = setuptools.package_index.PackageIndex(hosts=('www.example.com',))
# issue 20
url = 'http://http://svn.pythonpaste.org/Paste/wphp/trunk'
raise RuntimeError("Did not raise")
def test_bad_url_screwy_href(self):
- index = setuptools.package_index.PackageIndex(
- hosts=('www.example.com',)
- )
+ index = setuptools.package_index.PackageIndex(hosts=('www.example.com',))
# issue #160
if sys.version_info[0] == 2 and sys.version_info[1] == 7:
# this should not fail
url = 'http://example.com'
- page = ('<a href="http://www.famfamfam.com]('
- 'http://www.famfamfam.com/">')
+ page = '<a href="http://www.famfamfam.com](' 'http://www.famfamfam.com/">'
index.process_index(url, page)
def test_url_ok(self):
- index = setuptools.package_index.PackageIndex(
- hosts=('www.example.com',)
- )
+ index = setuptools.package_index.PackageIndex(hosts=('www.example.com',))
url = 'file:///tmp/test_package_index'
assert index.url_ok(url, True)
'b0',
'rc0',
]
- post = [
- '.post0'
- ]
+ post = ['.post0']
dev = [
'.dev0',
]
for e in epoch
for r in releases
for p in sum([pre, post, dev], [''])
- for locs in local]
+ for locs in local
+ ]
for v, vc in versions:
- dists = list(setuptools.package_index.distros_for_url(
- 'http://example.com/example.zip#egg=example-' + v))
+ dists = list(
+ setuptools.package_index.distros_for_url(
+ 'http://example.com/example.zip#egg=example-' + v
+ )
+ )
assert dists[0].version == ''
assert dists[1].version == vc
expected_dir = str(tmpdir / 'project@master')
expected = (
- 'git clone --quiet '
- 'https://github.example/group/project {expected_dir}'
+ 'git clone --quiet ' 'https://github.example/group/project {expected_dir}'
).format(**locals())
first_call_args = os_system_mock.call_args_list[0][0]
assert first_call_args == (expected,)
expected_dir = str(tmpdir / 'project')
expected = (
- 'git clone --quiet '
- 'https://github.example/group/project {expected_dir}'
+ 'git clone --quiet ' 'https://github.example/group/project {expected_dir}'
).format(**locals())
os_system_mock.assert_called_once_with(expected)
expected_dir = str(tmpdir / 'project')
expected = (
- 'svn checkout -q '
- 'svn+https://svn.example/project {expected_dir}'
+ 'svn checkout -q ' 'svn+https://svn.example/project {expected_dir}'
).format(**locals())
os_system_mock.assert_called_once_with(expected)
class TestContentCheckers:
def test_md5(self):
checker = setuptools.package_index.HashChecker.from_url(
- 'http://foo/bar#md5=f12895fdffbd45007040d2e44df98478')
+ 'http://foo/bar#md5=f12895fdffbd45007040d2e44df98478'
+ )
checker.feed('You should probably not be using MD5'.encode('ascii'))
assert checker.hash.hexdigest() == 'f12895fdffbd45007040d2e44df98478'
assert checker.is_valid()
def test_other_fragment(self):
"Content checks should succeed silently if no hash is present"
checker = setuptools.package_index.HashChecker.from_url(
- 'http://foo/bar#something%20completely%20different')
+ 'http://foo/bar#something%20completely%20different'
+ )
checker.feed('anything'.encode('ascii'))
assert checker.is_valid()
def test_blank_md5(self):
"Content checks should succeed if a hash is empty"
- checker = setuptools.package_index.HashChecker.from_url(
- 'http://foo/bar#md5=')
+ checker = setuptools.package_index.HashChecker.from_url('http://foo/bar#md5=')
checker.feed('anything'.encode('ascii'))
assert checker.is_valid()
def test_get_hash_name_md5(self):
checker = setuptools.package_index.HashChecker.from_url(
- 'http://foo/bar#md5=f12895fdffbd45007040d2e44df98478')
+ 'http://foo/bar#md5=f12895fdffbd45007040d2e44df98478'
+ )
assert checker.hash_name == 'md5'
def test_report(self):
checker = setuptools.package_index.HashChecker.from_url(
- 'http://foo/bar#md5=f12895fdffbd45007040d2e44df98478')
+ 'http://foo/bar#md5=f12895fdffbd45007040d2e44df98478'
+ )
rep = checker.report(lambda x: x, 'My message about %s')
assert rep == 'My message about md5'
def temp_home(tmpdir, monkeypatch):
key = (
'USERPROFILE'
- if platform.system() == 'Windows' and sys.version_info > (3, 8) else
- 'HOME'
+ if platform.system() == 'Windows' and sys.version_info > (3, 8)
+ else 'HOME'
)
monkeypatch.setitem(os.environ, key, str(tmpdir))
class TestPyPIConfig:
def test_percent_in_password(self, temp_home):
pypirc = temp_home / '.pypirc'
- pypirc.write(DALS("""
+ pypirc.write(
+ DALS(
+ """
[pypi]
repository=https://pypi.org
username=jaraco
password=pity%
- """))
+ """
+ )
+ )
cfg = setuptools.package_index.PyPIConfig()
cred = cfg.creds_by_repository['https://pypi.org']
assert cred.username == 'jaraco'
assert cred.password == 'pity%'
+
+
+@pytest.mark.timeout(1)
+def test_REL_DoS():
+ """
+ REL should not hang on a contrived attack string.
+ """
+ setuptools.package_index.REL.search('< rel=' + ' ' * 2**12)