import os
import re
import glob
+import stat
+import subprocess
+import zipfile
import six
@cvar _unpacked: path to the unpacked source tree
@type _unpacked: string
"""
- def __init__(self, name, unpacked=None, pkg_policy=PkgPolicy):
+ def __init__(self, name, unpacked=None, pkg_policy=PkgPolicy, prefix=None):
self._orig = False
self._tarball = False
self._pkg_policy = pkg_policy
self._filename_base, \
self._archive_fmt, \
self._compression = parse_archive_filename(os.path.basename(self.path))
+ self._prefix = prefix
+ if self._prefix is None:
+ self._determine_prefix()
self._check_orig()
if self.is_dir():
def path(self):
return self._path.rstrip('/')
+
+ @staticmethod
+ def _get_topdir_files(file_list):
+ """Parse content of the top directory from a file list
+
+ >>> UpstreamSource._get_topdir_files([])
+ set([])
+ >>> UpstreamSource._get_topdir_files([('-', 'foo/bar')])
+ set([('d', 'foo')])
+ >>> UpstreamSource._get_topdir_files([('d', 'foo/'), ('-', 'foo/bar')])
+ set([('d', 'foo')])
+ >>> UpstreamSource._get_topdir_files([('d', 'foo'), ('-', 'foo/bar')])
+ set([('d', 'foo')])
+ >>> UpstreamSource._get_topdir_files([('-', 'fob'), ('d', 'foo'), ('d', 'foo/bar'), ('-', 'foo/bar/baz')])
+ set([('-', 'fob'), ('d', 'foo')])
+ >>> UpstreamSource._get_topdir_files([('-', './foo/bar')])
+ set([('d', 'foo')])
+ >>> UpstreamSource._get_topdir_files([('-', 'foo/bar'), ('-', '.foo/bar')])
+ set([('d', '.foo'), ('d', 'foo')])
+ """
+ topdir_files = set()
+ for typ, path in file_list:
+ split = re.sub('^(?:./|../)*', '', path).split('/')
+ if len(split) == 1:
+ topdir_files.add((typ, path))
+ else:
+ topdir_files.add(('d', split[0]))
+ return topdir_files
+
+ def _determine_prefix(self):
+ """Determine the prefix, i.e. the "leading directory name"""
+ self._prefix = ''
+ if self.is_dir():
+ # For directories we presume that the prefix is just the dirname
+ self._prefix = os.path.basename(self.path.rstrip('/'))
+ else:
+ files = []
+ if self._archive_fmt == 'zip':
+ archive = zipfile.ZipFile(self.path)
+ for info in archive.infolist():
+ typ = 'd' if stat.S_ISDIR(info.external_attr >> 16) else '?'
+ files.append((typ, info.filename))
+ elif self._archive_fmt == 'tar':
+ popen = subprocess.Popen(['tar', '-t', '-v', '-f', self.path],
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ out, _err = popen.communicate()
+ if popen.returncode:
+ raise GbpError("Listing tar archive content failed")
+ for line in out.splitlines():
+ fields = line.split(None, 5)
+ files.append((fields[0][0], fields[-1]))
+ else:
+ raise GbpError("Unsupported archive format %s, unable to "
+ "determine prefix for '%s'" %
+ (self._archive_fmt, self.path))
+ # Determine prefix from the archive content
+ topdir_files = self._get_topdir_files(files)
+ if len(topdir_files) == 1:
+ typ, name = topdir_files.pop()
+ if typ == 'd':
+ self._prefix = name
+
@property
def archive_fmt(self):
"""Archive format of the sources, e.g. 'tar'"""
"""Compression format of the sources, e.g. 'gzip'"""
return self._compression
+ @property
+ def prefix(self):
+ """Prefix, i.e. the 'leading directory name' of the sources"""
+ return self._prefix
+
def unpack(self, dir, filters=[]):
"""
Unpack packed upstream sources into a given directory
raise GbpError("Filters must be a list")
self._unpack_archive(dir, filters)
- self.unpacked = self._unpacked_toplevel(dir)
+ src_dir = os.path.join(dir, self._prefix)
+ self.unpacked = src_dir if os.path.isdir(src_dir) else dir
def _unpack_archive(self, dir, filters):
"""
except gbpc.CommandExecFailed:
raise GbpError("Unpacking of %s failed" % self.path)
- def _unpacked_toplevel(self, dir):
- """unpacked archives can contain a leading directory or not"""
- unpacked = glob.glob('%s/*' % dir)
- unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders
- # Check that dir contains nothing but a single folder:
- if len(unpacked) == 1 and os.path.isdir(unpacked[0]):
- return unpacked[0]
- else:
- # We can determine "no prefix" from this
- return os.path.join(dir, ".")
-
def _unpack_tar(self, dir, filters):
"""
Unpack a tarball to I{dir} applying a list of I{filters}. Leave the
self.assertEqual(source.path, self.upstream_dir)
self.assertEqual(source.unpacked, self.upstream_dir)
self.assertEqual(source.guess_version(), ('test', '1.0'))
+ self.assertEqual(source.prefix, 'test-1.0')
def tearDown(self):
context.teardown()
self.assertEqual(repacked.guess_version(), ('gbp', '0.1'))
self.assertEqual(repacked.archive_fmt, 'tar')
self.assertEqual(repacked.compression, 'bzip2')
+ self.assertEqual(repacked.prefix, 'gbp')
self._check_tar(repacked, ["gbp/errors.py", "gbp/__init__.py"])
def test_pack_filtered(self):
self.zipfile = self.tmpdir.join("gbp-0.1.zip")
z = zipfile.ZipFile(self.zipfile, "w")
for f in glob.glob(os.path.join(context.projectdir, "gbp/*.py")):
- z.write(f, f, zipfile.ZIP_DEFLATED)
+ arcname = os.path.relpath(f, context.projectdir)
+ z.write(f, arcname, zipfile.ZIP_DEFLATED)
z.close()
def tearDown(self):
self.assertEqual(source.guess_version(), ('gbp', '0.1'))
self.assertEqual(source.archive_fmt, 'zip')
self.assertEqual(source.compression, None)
+ self.assertEqual(source.prefix, 'gbp')
source.unpack(str(self.tmpdir))
self.assertNotEqual(source.unpacked, None)