Refactor deb helpers: move UpstreamSource class
authorMarkus Lehtonen <markus.lehtonen@linux.intel.com>
Wed, 8 Feb 2012 12:42:00 +0000 (14:42 +0200)
committerGuido Günther <agx@sigxcpu.org>
Tue, 1 May 2012 20:29:29 +0000 (22:29 +0200)
to pkg base module. This refactor is preparation to the upcoming rpm
support.

gbp/deb/__init__.py
gbp/pkg/__init__.py
gbp/scripts/buildpackage.py
gbp/scripts/common/import_orig.py
gbp/scripts/import_dsc.py
tests/02_test_upstream_source_tar_unpack.py
tests/06_test_upstream_source.py

index c0919c3..19e78fa 100644 (file)
@@ -28,7 +28,7 @@ import gbp.command_wrappers as gbpc
 from gbp.errors import GbpError
 from gbp.git import GitRepositoryError
 from gbp.deb.changelog import ChangeLog, NoChangeLogError
-from gbp.pkg import (PkgPolicy, compressor_opts, compressor_aliases)
+from gbp.pkg import (PkgPolicy, UpstreamSource, compressor_opts)
 
 # When trying to parse a version-number from a dsc or changes file, these are
 # the valid characters.
@@ -113,212 +113,6 @@ class DpkgCompareVersions(gbpc.Command):
         return 0
 
 
-class UpstreamSource(object):
-    """
-    Upstream source. Can be either an unpacked dir, a tarball or another type
-    of archive
-
-    @cvar _orig: are the upstream sources already suitable as an upstream
-                 tarball
-    @type _orig: boolean
-    @cvar _path: path to the upstream sources
-    @type _path: string
-    @cvar _unpacked: path to the unpacked source tree
-    @type _unpacked: string
-    """
-    def __init__(self, name, unpacked=None):
-        self._orig = False
-        self._path = name
-        self.unpacked = unpacked
-
-        self._check_orig()
-        if self.is_dir():
-            self.unpacked = self.path
-
-    def _check_orig(self):
-        """
-        Check if upstream source format can be used as orig tarball.
-        This doesn't imply that the tarball is correctly named.
-
-        @return: C{True} if upstream source format is suitable
-            as upstream tarball, C{False} otherwise.
-        @rtype: C{bool}
-        """
-        if self.is_dir():
-            self._orig = False
-            return
-
-        parts = self._path.split('.')
-        try:
-            if parts[-1] == 'tgz':
-                self._orig = True
-            elif parts[-2] == 'tar':
-                if (parts[-1] in compressor_opts or
-                    parts[-1] in compressor_aliases):
-                        self._orig = True
-        except IndexError:
-            self._orig = False
-
-    def is_orig(self):
-        """
-        @return: C{True} if sources are suitable as upstream source,
-            C{False} otherwise
-        @rtype: C{bool}
-        """
-        return self._orig
-
-    def is_dir(self):
-        """
-        @return: C{True} if if upstream sources are an unpacked directory,
-            C{False} otherwise
-        @rtype: C{bool}
-        """
-        return True if os.path.isdir(self._path) else False
-
-    @property
-    def path(self):
-        return self._path.rstrip('/')
-
-    def unpack(self, dir, filters=[]):
-        """
-        Unpack packed upstream sources into a given directory
-        and determine the toplevel of the source tree.
-        """
-        if self.is_dir():
-            raise GbpError, "Cannot unpack directory %s" % self.path
-
-        if not filters:
-            filters = []
-
-        if type(filters) != type([]):
-            raise GbpError, "Filters must be a list"
-
-        self._unpack_archive(dir, filters)
-        self.unpacked = self._unpacked_toplevel(dir)
-
-    def _unpack_archive(self, dir, filters):
-        """
-        Unpack packed upstream sources into a given directory.
-        """
-        ext = os.path.splitext(self.path)[1]
-        if ext in [ ".zip", ".xpi" ]:
-            self._unpack_zip(dir)
-        else:
-            self._unpack_tar(dir, filters)
-
-    def _unpack_zip(self, dir):
-        try:
-            gbpc.UnpackZipArchive(self.path, dir)()
-        except gbpc.CommandExecFailed:
-            raise GbpError, "Unpacking of %s failed" % self.path
-
-    def _unpacked_toplevel(self, dir):
-        """unpacked archives can contain a leading directory or not"""
-        unpacked = glob.glob('%s/*' % dir)
-        unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders
-        # Check that dir contains nothing but a single folder:
-        if len(unpacked) == 1 and os.path.isdir(unpacked[0]):
-            return unpacked[0]
-        else:
-            return dir
-
-    def _unpack_tar(self, dir, filters):
-        """
-        Unpack a tarball to I{dir} applying a list of I{filters}. Leave the
-        cleanup to the caller in case of an error.
-        """
-        try:
-            unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters)
-            unpackArchive()
-        except gbpc.CommandExecFailed:
-            # unpackArchive already printed an error message
-            raise GbpError
-
-    def pack(self, newarchive, filters=[]):
-        """
-        Recreate a new archive from the current one
-
-        @param newarchive: the name of the new archive
-        @type newarchive: string
-        @param filters: tar filters to apply
-        @type filters: array of strings
-        @return: the new upstream source
-        @rtype: UpstreamSource
-        """
-        if not self.unpacked:
-            raise GbpError, "Need an unpacked source tree to pack"
-
-        if not filters:
-            filters = []
-
-        if type(filters) != type([]):
-            raise GbpError, "Filters must be a list"
-
-        try:
-            unpacked = self.unpacked.rstrip('/')
-            repackArchive = gbpc.PackTarArchive(newarchive,
-                                os.path.dirname(unpacked),
-                                os.path.basename(unpacked),
-                                filters)
-            repackArchive()
-        except gbpc.CommandExecFailed:
-            # repackArchive already printed an error
-            raise GbpError
-        return UpstreamSource(newarchive)
-
-    @staticmethod
-    def known_compressions():
-        return [ args[1][-1] for args in compressor_opts.items() ]
-
-    def guess_version(self, extra_regex=r''):
-        """
-        Guess the package name and version from the filename of an upstream
-        archive.
-
-        >>> UpstreamSource('foo-bar_0.2.orig.tar.gz').guess_version()
-        ('foo-bar', '0.2')
-        >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version()
-        >>> UpstreamSource('git-bar-0.2.tar.gz').guess_version()
-        ('git-bar', '0.2')
-        >>> UpstreamSource('git-bar-0.2-rc1.tar.gz').guess_version()
-        ('git-bar', '0.2-rc1')
-        >>> UpstreamSource('git-bar-0.2:~-rc1.tar.gz').guess_version()
-        ('git-bar', '0.2:~-rc1')
-        >>> UpstreamSource('git-Bar-0A2d:rc1.tar.bz2').guess_version()
-        ('git-Bar', '0A2d:rc1')
-        >>> UpstreamSource('git-1.tar.bz2').guess_version()
-        ('git', '1')
-        >>> UpstreamSource('kvm_87+dfsg.orig.tar.gz').guess_version()
-        ('kvm', '87+dfsg')
-        >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version()
-        >>> UpstreamSource('foo-Bar-a.b.tar.gz').guess_version()
-        >>> UpstreamSource('foo-bar_0.2.orig.tar.xz').guess_version()
-        ('foo-bar', '0.2')
-        >>> UpstreamSource('foo-bar_0.2.orig.tar.lzma').guess_version()
-        ('foo-bar', '0.2')
-
-        @param extra_regex: additional regex to apply, needs a 'package' and a
-                            'version' group
-        @return: (package name, version) or None.
-        @rtype: tuple
-        """
-        version_chars = r'[a-zA-Z\d\.\~\-\:\+]'
-        extensions = r'\.tar\.(%s)' % "|".join(self.known_compressions())
-
-        version_filters = map ( lambda x: x % (version_chars, extensions),
-                           ( # Debian package_<version>.orig.tar.gz:
-                             r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig%s',
-                             # Upstream package-<version>.tar.gz:
-                             r'^(?P<package>[a-zA-Z\d\.\+\-]+)-(?P<version>[0-9]%s*)%s'))
-        if extra_regex:
-            version_filters = extra_regex + version_filters
-
-        for filter in version_filters:
-            m = re.match(filter, os.path.basename(self.path))
-            if m:
-                return (m.group('package'), m.group('version'))
-
-
 class DscFile(object):
     """Keeps all needed data read from a dscfile"""
     compressions = r"(%s)" % '|'.join(UpstreamSource.known_compressions())
index a3f02bc..540c22c 100644 (file)
@@ -102,3 +102,208 @@ class PkgPolicy(object):
             return False
         return True
 
+
+class UpstreamSource(object):
+    """
+    Upstream source. Can be either an unpacked dir, a tarball or another type
+    of archive
+
+    @cvar _orig: are the upstream sources already suitable as an upstream
+                 tarball
+    @type _orig: boolean
+    @cvar _path: path to the upstream sources
+    @type _path: string
+    @cvar _unpacked: path to the unpacked source tree
+    @type _unpacked: string
+    """
+    def __init__(self, name, unpacked=None):
+        self._orig = False
+        self._path = name
+        self.unpacked = unpacked
+
+        self._check_orig()
+        if self.is_dir():
+            self.unpacked = self.path
+
+    def _check_orig(self):
+        """
+        Check if upstream source format can be used as orig tarball.
+        This doesn't imply that the tarball is correctly named.
+
+        @return: C{True} if upstream source format is suitable
+            as upstream tarball, C{False} otherwise.
+        @rtype: C{bool}
+        """
+        if self.is_dir():
+            self._orig = False
+            return
+
+        parts = self._path.split('.')
+        try:
+            if parts[-1] == 'tgz':
+                self._orig = True
+            elif parts[-2] == 'tar':
+                if (parts[-1] in compressor_opts or
+                    parts[-1] in compressor_aliases):
+                        self._orig = True
+        except IndexError:
+            self._orig = False
+
+    def is_orig(self):
+        """
+        @return: C{True} if sources are suitable as upstream source,
+            C{False} otherwise
+        @rtype: C{bool}
+        """
+        return self._orig
+
+    def is_dir(self):
+        """
+        @return: C{True} if if upstream sources are an unpacked directory,
+            C{False} otherwise
+        @rtype: C{bool}
+        """
+        return True if os.path.isdir(self._path) else False
+
+    @property
+    def path(self):
+        return self._path.rstrip('/')
+
+    def unpack(self, dir, filters=[]):
+        """
+        Unpack packed upstream sources into a given directory
+        and determine the toplevel of the source tree.
+        """
+        if self.is_dir():
+            raise GbpError, "Cannot unpack directory %s" % self.path
+
+        if not filters:
+            filters = []
+
+        if type(filters) != type([]):
+            raise GbpError, "Filters must be a list"
+
+        self._unpack_archive(dir, filters)
+        self.unpacked = self._unpacked_toplevel(dir)
+
+    def _unpack_archive(self, dir, filters):
+        """
+        Unpack packed upstream sources into a given directory.
+        """
+        ext = os.path.splitext(self.path)[1]
+        if ext in [ ".zip", ".xpi" ]:
+            self._unpack_zip(dir)
+        else:
+            self._unpack_tar(dir, filters)
+
+    def _unpack_zip(self, dir):
+        try:
+            gbpc.UnpackZipArchive(self.path, dir)()
+        except gbpc.CommandExecFailed:
+            raise GbpError, "Unpacking of %s failed" % self.path
+
+    def _unpacked_toplevel(self, dir):
+        """unpacked archives can contain a leading directory or not"""
+        unpacked = glob.glob('%s/*' % dir)
+        unpacked.extend(glob.glob("%s/.*" % dir)) # include hidden files and folders
+        # Check that dir contains nothing but a single folder:
+        if len(unpacked) == 1 and os.path.isdir(unpacked[0]):
+            return unpacked[0]
+        else:
+            return dir
+
+    def _unpack_tar(self, dir, filters):
+        """
+        Unpack a tarball to I{dir} applying a list of I{filters}. Leave the
+        cleanup to the caller in case of an error.
+        """
+        try:
+            unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters)
+            unpackArchive()
+        except gbpc.CommandExecFailed:
+            # unpackArchive already printed an error message
+            raise GbpError
+
+    def pack(self, newarchive, filters=[]):
+        """
+        Recreate a new archive from the current one
+
+        @param newarchive: the name of the new archive
+        @type newarchive: string
+        @param filters: tar filters to apply
+        @type filters: array of strings
+        @return: the new upstream source
+        @rtype: UpstreamSource
+        """
+        if not self.unpacked:
+            raise GbpError, "Need an unpacked source tree to pack"
+
+        if not filters:
+            filters = []
+
+        if type(filters) != type([]):
+            raise GbpError, "Filters must be a list"
+
+        try:
+            unpacked = self.unpacked.rstrip('/')
+            repackArchive = gbpc.PackTarArchive(newarchive,
+                                os.path.dirname(unpacked),
+                                os.path.basename(unpacked),
+                                filters)
+            repackArchive()
+        except gbpc.CommandExecFailed:
+            # repackArchive already printed an error
+            raise GbpError
+        return UpstreamSource(newarchive)
+
+    @staticmethod
+    def known_compressions():
+        return [ args[1][-1] for args in compressor_opts.items() ]
+
+    def guess_version(self, extra_regex=r''):
+        """
+        Guess the package name and version from the filename of an upstream
+        archive.
+
+        >>> UpstreamSource('foo-bar_0.2.orig.tar.gz').guess_version()
+        ('foo-bar', '0.2')
+        >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version()
+        >>> UpstreamSource('git-bar-0.2.tar.gz').guess_version()
+        ('git-bar', '0.2')
+        >>> UpstreamSource('git-bar-0.2-rc1.tar.gz').guess_version()
+        ('git-bar', '0.2-rc1')
+        >>> UpstreamSource('git-bar-0.2:~-rc1.tar.gz').guess_version()
+        ('git-bar', '0.2:~-rc1')
+        >>> UpstreamSource('git-Bar-0A2d:rc1.tar.bz2').guess_version()
+        ('git-Bar', '0A2d:rc1')
+        >>> UpstreamSource('git-1.tar.bz2').guess_version()
+        ('git', '1')
+        >>> UpstreamSource('kvm_87+dfsg.orig.tar.gz').guess_version()
+        ('kvm', '87+dfsg')
+        >>> UpstreamSource('foo-Bar_0.2.orig.tar.gz').guess_version()
+        >>> UpstreamSource('foo-Bar-a.b.tar.gz').guess_version()
+        >>> UpstreamSource('foo-bar_0.2.orig.tar.xz').guess_version()
+        ('foo-bar', '0.2')
+        >>> UpstreamSource('foo-bar_0.2.orig.tar.lzma').guess_version()
+        ('foo-bar', '0.2')
+
+        @param extra_regex: additional regex to apply, needs a 'package' and a
+                            'version' group
+        @return: (package name, version) or None.
+        @rtype: tuple
+        """
+        version_chars = r'[a-zA-Z\d\.\~\-\:\+]'
+        extensions = r'\.tar\.(%s)' % "|".join(self.known_compressions())
+
+        version_filters = map ( lambda x: x % (version_chars, extensions),
+                           ( # Debian package_<version>.orig.tar.gz:
+                             r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig%s',
+                             # Upstream package-<version>.tar.gz:
+                             r'^(?P<package>[a-zA-Z\d\.\+\-]+)-(?P<version>[0-9]%s*)%s'))
+        if extra_regex:
+            version_filters = extra_regex + version_filters
+
+        for filter in version_filters:
+            m = re.match(filter, os.path.basename(self.path))
+            if m:
+                return (m.group('package'), m.group('version'))
index e1e1097..bef2121 100755 (executable)
@@ -38,7 +38,7 @@ from gbp.scripts.common.buildpackage import (index_name, wc_name,
                                              git_archive_submodules,
                                              git_archive_single, dump_tree,
                                              write_wc, drop_index)
-from gbp.pkg import (compressor_opts, compressor_aliases)
+from gbp.pkg import (UpstreamSource, compressor_opts, compressor_aliases)
 
 def git_archive(repo, cp, output_dir, treeish, comp_type, comp_level, with_submodules):
     "create a compressed orig tarball in output_dir using git_archive"
@@ -169,7 +169,7 @@ def extract_orig(orig_tarball, dest_dir):
     gbp.log.info("Extracting %s to '%s'" % (os.path.basename(orig_tarball), dest_dir))
 
     move_old_export(dest_dir)
-    upstream = gbp.deb.UpstreamSource(orig_tarball)
+    upstream = UpstreamSource(orig_tarball)
     upstream.unpack(dest_dir)
 
     # Check if tarball extracts into a single folder or not:
index 62daab8..fe38d90 100644 (file)
@@ -20,7 +20,7 @@
 import os
 import tempfile
 import gbp.command_wrappers as gbpc
-from gbp.deb import UpstreamSource
+from gbp.pkg import UpstreamSource
 from gbp.errors import GbpError
 import gbp.log
 
index 186143f..ca4031d 100644 (file)
@@ -26,8 +26,9 @@ import glob
 import pipes
 import time
 import gbp.command_wrappers as gbpc
+from gbp.pkg import UpstreamSource
 from gbp.deb import (debian_version_chars,
-                     parse_dsc, DscFile, UpstreamSource)
+                     parse_dsc, DscFile)
 from gbp.deb.git import (DebianGitRepository, GitRepositoryError)
 from gbp.deb.changelog import ChangeLog
 from gbp.git import rfc822_date_to_git
index dcd624b..f77f374 100644 (file)
@@ -49,7 +49,7 @@ class TestUnpack:
 
     def test_upstream_source_type(self):
         for (comp, archive) in self.archives.iteritems():
-            source = gbp.deb.UpstreamSource(archive[0])
+            source = gbp.pkg.UpstreamSource(archive[0])
             assert source.is_orig() == True
             assert source.is_dir() == False
             assert source.unpacked == None
@@ -60,13 +60,13 @@ class TestUnpack:
 
     def test_upstream_source_unpack(self):
         for (comp, archive) in self.archives.iteritems():
-            source = gbp.deb.UpstreamSource(archive[0])
+            source = gbp.pkg.UpstreamSource(archive[0])
             source.unpack(".")
             self._check_files(archive[1], comp)
 
     def test_upstream_source_unpack_no_filter(self):
         for (comp, archive) in self.archives.iteritems():
-            source = gbp.deb.UpstreamSource(archive[0])
+            source = gbp.pkg.UpstreamSource(archive[0])
             source.unpack(".", [])
             self._check_files(archive[1], comp)
 
@@ -74,7 +74,7 @@ class TestUnpack:
         exclude = "README"
 
         for (comp, archive) in self.archives.iteritems():
-            source = gbp.deb.UpstreamSource(archive[0])
+            source = gbp.pkg.UpstreamSource(archive[0])
             source.unpack(".", [exclude])
             archive[1].remove(exclude)
             self._check_files(archive[1], comp)
index ddc34d4..e88f429 100644 (file)
@@ -10,7 +10,7 @@ import tempfile
 import unittest
 import zipfile
 
-from gbp.deb import UpstreamSource
+from gbp.pkg import UpstreamSource
 
 class TestDir(unittest.TestCase):
     def test_directory(self):