UpstreamSource: suppress stderr from tar in _determine_prefix()
[tools/git-buildpackage.git] / gbp / pkg / __init__.py
1 # vim: set fileencoding=utf-8 :
2 #
3 # (C) 2006,2007 Guido Guenther <agx@sigxcpu.org>
4 # (C) 2012 Intel Corporation <markus.lehtonen@linux.intel.com>
5 #    This program is free software; you can redistribute it and/or modify
6 #    it under the terms of the GNU General Public License as published by
7 #    the Free Software Foundation; either version 2 of the License, or
8 #    (at your option) any later version.
9 #
10 #    This program is distributed in the hope that it will be useful,
11 #    but WITHOUT ANY WARRANTY; without even the implied warranty of
12 #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 #    GNU General Public License for more details.
14 #
15 #    You should have received a copy of the GNU General Public License
16 #    along with this program; if not, write to the Free Software
17 #    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18 """Common functionality of the Debian/RPM package helpers"""
19
20 import os
21 import re
22 import glob
23 import stat
24 import subprocess
25 import zipfile
26
27 import gbp.command_wrappers as gbpc
28 from gbp.errors import GbpError
29
30 # compression types, extra options and extensions
31 compressor_opts = { 'gzip'  : [ ['-n'], 'gz' ],
32                     'bzip2' : [ [], 'bz2' ],
33                     'lzma'  : [ [], 'lzma' ],
34                     'xz'    : [ [], 'xz' ] }
35
36 # Map frequently used names of compression types to the internal ones:
37 compressor_aliases = { 'bz2' : 'bzip2',
38                        'gz'  : 'gzip', }
39
40 # Supported archive formats
41 archive_formats = [ 'tar', 'zip' ]
42
43 # Map combined file extensions to archive and compression format
44 archive_ext_aliases = { 'tgz'   : ('tar', 'gzip'),
45                         'tbz2'  : ('tar', 'bzip2'),
46                         'tlz'   : ('tar', 'lzma'),
47                         'txz'   : ('tar', 'xz')}
48
49 def parse_archive_filename(filename):
50     """
51     Given an filename return the basename (i.e. filename without the
52     archive and compression extensions), archive format and compression
53     method used.
54
55     @param filename: the name of the file
56     @type filename: string
57     @return: tuple containing basename, archive format and compression method
58     @rtype: C{tuple} of C{str}
59
60     >>> parse_archive_filename("abc.tar.gz")
61     ('abc', 'tar', 'gzip')
62     >>> parse_archive_filename("abc.tar.bz2")
63     ('abc', 'tar', 'bzip2')
64     >>> parse_archive_filename("abc.def.tbz2")
65     ('abc.def', 'tar', 'bzip2')
66     >>> parse_archive_filename("abc.def.tar.xz")
67     ('abc.def', 'tar', 'xz')
68     >>> parse_archive_filename("abc.zip")
69     ('abc', 'zip', None)
70     >>> parse_archive_filename("abc.lzma")
71     ('abc', None, 'lzma')
72     >>> parse_archive_filename("abc.tar.foo")
73     ('abc.tar.foo', None, None)
74     >>> parse_archive_filename("abc")
75     ('abc', None, None)
76     """
77     (base_name, archive_fmt, compression) = (filename, None, None)
78
79     # Split filename to pieces
80     split = filename.split(".")
81     if len(split) > 1:
82         if split[-1] in archive_ext_aliases:
83             base_name = ".".join(split[:-1])
84             (archive_fmt, compression) = archive_ext_aliases[split[-1]]
85         elif split[-1] in archive_formats:
86             base_name = ".".join(split[:-1])
87             (archive_fmt, compression) = (split[-1], None)
88         else:
89             for (c, o) in compressor_opts.iteritems():
90                 if o[1] == split[-1]:
91                     base_name = ".".join(split[:-1])
92                     compression = c
93                     if len(split) > 2 and split[-2] in archive_formats:
94                         base_name = ".".join(split[:-2])
95                         archive_fmt = split[-2]
96
97     return (base_name, archive_fmt, compression)
98
99
100 class PkgPolicy(object):
101     """
102     Common helpers for packaging policy.
103     """
104     packagename_re = None
105     packagename_msg = None
106     upstreamversion_re = None
107     upstreamversion_msg = None
108
109     @classmethod
110     def is_valid_packagename(cls, name):
111         """
112         Is this a valid package name?
113
114         >>> PkgPolicy.is_valid_packagename('doesnotmatter')
115         Traceback (most recent call last):
116         ...
117         NotImplementedError: Class needs to provide packagename_re
118         """
119         if cls.packagename_re is None:
120             raise NotImplementedError("Class needs to provide packagename_re")
121         return True if cls.packagename_re.match(name) else False
122
123     @classmethod
124     def is_valid_upstreamversion(cls, version):
125         """
126         Is this a valid upstream version number?
127
128         >>> PkgPolicy.is_valid_upstreamversion('doesnotmatter')
129         Traceback (most recent call last):
130         ...
131         NotImplementedError: Class needs to provide upstreamversion_re
132         """
133         if cls.upstreamversion_re is None:
134             raise NotImplementedError("Class needs to provide upstreamversion_re")
135         return True if cls.upstreamversion_re.match(version) else False
136
137     @classmethod
138     def is_valid_orig_archive(cls, filename):
139         "Is this a valid orig source archive"
140         (base, arch_fmt, compression) =  parse_archive_filename(filename)
141         if arch_fmt == 'tar' and compression:
142             return True
143         return False
144
145     @classmethod
146     def guess_upstream_src_version(cls, filename, extra_regex=r''):
147         """
148         Guess the package name and version from the filename of an upstream
149         archive.
150
151         @param filename: filename (archive or directory) from which to guess
152         @type filename: C{string}
153         @param extra_regex: additional regex to apply, needs a 'package' and a
154                             'version' group
155         @return: (package name, version) or ('', '')
156         @rtype: tuple
157
158         >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.gz')
159         ('foo-bar', '0.2')
160         >>> PkgPolicy.guess_upstream_src_version('foo-Bar_0.2.orig.tar.gz')
161         ('foo-Bar', '0.2.orig')
162         >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2.tar.gz')
163         ('git-bar', '0.2')
164         >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2-rc1.tar.gz')
165         ('git-bar', '0.2-rc1')
166         >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2:~-rc1.tar.gz')
167         ('git-bar', '0.2:~-rc1')
168         >>> PkgPolicy.guess_upstream_src_version('git-Bar-0A2d:rc1.tar.bz2')
169         ('git-Bar', '0A2d:rc1')
170         >>> PkgPolicy.guess_upstream_src_version('git-1.tar.bz2')
171         ('git', '1')
172         >>> PkgPolicy.guess_upstream_src_version('kvm_87+dfsg.orig.tar.gz')
173         ('kvm', '87+dfsg')
174         >>> PkgPolicy.guess_upstream_src_version('foo-Bar-a.b.tar.gz')
175         ('', '')
176         >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.xz')
177         ('foo-bar', '0.2')
178         >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.tar.gz')
179         ('foo-bar', '0.2')
180         >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.lzma')
181         ('foo-bar', '0.2')
182         >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.zip')
183         ('foo-bar', '0.2')
184         >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.tlz')
185         ('foo-bar', '0.2')
186         """
187         version_chars = r'[a-zA-Z\d\.\~\-\:\+]'
188         basename = parse_archive_filename(os.path.basename(filename))[0]
189
190         version_filters = map ( lambda x: x % version_chars,
191                            ( # Debian upstream tarball: package_'<version>.orig.tar.gz'
192                              r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig',
193                              # Upstream 'package-<version>.tar.gz'
194                              # or Debian native 'package_<version>.tar.gz'
195                              # or directory 'package-<version>':
196                              r'^(?P<package>[a-zA-Z\d\.\+\-]+)(-|_)(?P<version>[0-9]%s*)'))
197         if extra_regex:
198             version_filters = extra_regex + version_filters
199
200         for filter in version_filters:
201             m = re.match(filter, basename)
202             if m:
203                 return (m.group('package'), m.group('version'))
204         return ('', '')
205
206     @staticmethod
207     def guess_upstream_src_version(filename, extra_regex=r''):
208         """
209         Guess the package name and version from the filename of an upstream
210         archive.
211
212         @param filename: filename (archive or directory) from which to guess
213         @type filename: C{string}
214         @param extra_regex: additional regex to apply, needs a 'package' and a
215                             'version' group
216         @return: (package name, version) or ('', '')
217         @rtype: tuple
218
219         >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.gz')
220         ('foo-bar', '0.2')
221         >>> PkgPolicy.guess_upstream_src_version('foo-Bar_0.2.orig.tar.gz')
222         ('', '')
223         >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2.tar.gz')
224         ('git-bar', '0.2')
225         >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2-rc1.tar.gz')
226         ('git-bar', '0.2-rc1')
227         >>> PkgPolicy.guess_upstream_src_version('git-bar-0.2:~-rc1.tar.gz')
228         ('git-bar', '0.2:~-rc1')
229         >>> PkgPolicy.guess_upstream_src_version('git-Bar-0A2d:rc1.tar.bz2')
230         ('git-Bar', '0A2d:rc1')
231         >>> PkgPolicy.guess_upstream_src_version('git-1.tar.bz2')
232         ('git', '1')
233         >>> PkgPolicy.guess_upstream_src_version('kvm_87+dfsg.orig.tar.gz')
234         ('kvm', '87+dfsg')
235         >>> PkgPolicy.guess_upstream_src_version('foo-Bar-a.b.tar.gz')
236         ('', '')
237         >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.xz')
238         ('foo-bar', '0.2')
239         >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.orig.tar.lzma')
240         ('foo-bar', '0.2')
241         >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.zip')
242         ('foo-bar', '0.2')
243         >>> PkgPolicy.guess_upstream_src_version('foo-bar-0.2.tlz')
244         ('foo-bar', '0.2')
245         >>> PkgPolicy.guess_upstream_src_version('foo-bar_0.2.tar.gz')
246         ('foo-bar', '0.2')
247         """
248         version_chars = r'[a-zA-Z\d\.\~\-\:\+]'
249         basename = parse_archive_filename(os.path.basename(filename))[0]
250
251         version_filters = map ( lambda x: x % version_chars,
252                            ( # Debian upstream tarball: package_'<version>.orig.tar.gz'
253                              r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)\.orig',
254                              # Debian native: 'package_<version>.tar.gz'
255                              r'^(?P<package>[a-z\d\.\+\-]+)_(?P<version>%s+)',
256                              # Upstream 'package-<version>.tar.gz'
257                              # or directory 'package-<version>':
258                              r'^(?P<package>[a-zA-Z\d\.\+\-]+)(-)(?P<version>[0-9]%s*)'))
259         if extra_regex:
260             version_filters = extra_regex + version_filters
261
262         for filter in version_filters:
263             m = re.match(filter, basename)
264             if m:
265                 return (m.group('package'), m.group('version'))
266         return ('', '')
267
268     @staticmethod
269     def has_orig(orig_file, dir):
270         "Check if orig tarball exists in dir"
271         try:
272             os.stat( os.path.join(dir, orig_file) )
273         except OSError:
274             return False
275         return True
276
277     @staticmethod
278     def symlink_orig(orig_file, orig_dir, output_dir, force=False):
279         """
280         symlink orig tarball from orig_dir to output_dir
281         @return: True if link was created or src == dst
282                  False in case of error or src doesn't exist
283         """
284         orig_dir = os.path.abspath(orig_dir)
285         output_dir = os.path.abspath(output_dir)
286
287         if orig_dir == output_dir:
288             return True
289
290         src = os.path.join(orig_dir, orig_file)
291         dst = os.path.join(output_dir, orig_file)
292         if not os.access(src, os.F_OK):
293             return False
294         try:
295             if os.access(dst, os.F_OK) and force:
296                 os.unlink(dst)
297             os.symlink(src, dst)
298         except OSError:
299             return False
300         return True
301
302
303 class UpstreamSource(object):
304     """
305     Upstream source. Can be either an unpacked dir, a tarball or another type
306     of archive
307
308     @cvar _orig: are the upstream sources already suitable as an upstream
309                  tarball
310     @type _orig: boolean
311     @cvar _path: path to the upstream sources
312     @type _path: string
313     @cvar _unpacked: path to the unpacked source tree
314     @type _unpacked: string
315     """
316     def __init__(self, name, unpacked=None, pkg_policy=PkgPolicy, prefix=None):
317         self._orig = False
318         self._tarball = False
319         self._pkg_policy = pkg_policy
320         self._path = os.path.abspath(name)
321         if not os.path.exists(self._path):
322             raise GbpError('UpstreamSource: unable to find %s' % self._path)
323         self.unpacked = unpacked
324         self._filename_base, \
325         self._archive_fmt, \
326         self._compression = parse_archive_filename(os.path.basename(self.path))
327         self._prefix = prefix
328         if self._prefix is None:
329             self._determine_prefix()
330
331         self._check_orig()
332         if self.is_dir():
333             self.unpacked = self.path
334
335     def _check_orig(self):
336         """
337         Check if upstream source format can be used as orig tarball.
338         This doesn't imply that the tarball is correctly named.
339
340         @return: C{True} if upstream source format is suitable
341             as upstream tarball, C{False} otherwise.
342         @rtype: C{bool}
343         """
344         if self.is_dir():
345             self._orig = False
346             self._tarball = False
347             return
348
349         self._tarball = True if self.archive_fmt == 'tar' else False
350         self._orig = self._pkg_policy.is_valid_orig_archive(os.path.basename(self.path))
351
352     def is_orig(self):
353         """
354         @return: C{True} if sources are suitable as upstream source,
355             C{False} otherwise
356         @rtype: C{bool}
357         """
358         return self._orig
359
360     def is_tarball(self):
361         """
362         @return: C{True} if source is a tarball, C{False} otherwise
363         @rtype: C{bool}
364         """
365         return self._tarball
366
367     def is_dir(self):
368         """
369         @return: C{True} if if upstream sources are an unpacked directory,
370             C{False} otherwise
371         @rtype: C{bool}
372         """
373         return True if os.path.isdir(self._path) else False
374
375     @property
376     def path(self):
377         return self._path.rstrip('/')
378
379
380     @staticmethod
381     def _get_topdir_files(file_list):
382         """Parse content of the top directory from a file list
383
384         >>> UpstreamSource._get_topdir_files([])
385         set([])
386         >>> UpstreamSource._get_topdir_files([('-', 'foo/bar')])
387         set([('d', 'foo')])
388         >>> UpstreamSource._get_topdir_files([('d', 'foo/'), ('-', 'foo/bar')])
389         set([('d', 'foo')])
390         >>> UpstreamSource._get_topdir_files([('d', 'foo'), ('-', 'foo/bar')])
391         set([('d', 'foo')])
392         >>> UpstreamSource._get_topdir_files([('-', 'fob'), ('d', 'foo'), ('d', 'foo/bar'), ('-', 'foo/bar/baz')])
393         set([('-', 'fob'), ('d', 'foo')])
394         >>> UpstreamSource._get_topdir_files([('-', './foo/bar')])
395         set([('d', 'foo')])
396         >>> UpstreamSource._get_topdir_files([('-', 'foo/bar'), ('-', '.foo/bar')])
397         set([('d', '.foo'), ('d', 'foo')])
398         """
399         topdir_files = set()
400         for typ, path in file_list:
401             split = re.sub('^(?:./|../)*', '', path).split('/')
402             if len(split) == 1:
403                 topdir_files.add((typ, path))
404             else:
405                 topdir_files.add(('d', split[0]))
406         return topdir_files
407
408     def _determine_prefix(self):
409         """Determine the prefix, i.e. the "leading directory name"""
410         self._prefix = ''
411         if self.is_dir():
412             # For directories we presume that the prefix is just the dirname
413             self._prefix = os.path.basename(self.path.rstrip('/'))
414         else:
415             files = []
416             if self._archive_fmt == 'zip':
417                 archive = zipfile.ZipFile(self.path)
418                 for info in archive.infolist():
419                     typ = 'd' if stat.S_ISDIR(info.external_attr >> 16) else '?'
420                     files.append((typ, info.filename))
421             elif self._archive_fmt == 'tar':
422                 popen = subprocess.Popen(['tar', '-t', '-v', '-f', self.path],
423                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
424                 out, _err = popen.communicate()
425                 if popen.returncode:
426                     raise GbpError("Listing tar archive content failed")
427                 for line in out.splitlines():
428                     fields = line.split(None, 5)
429                     files.append((fields[0][0], fields[-1]))
430             else:
431                 raise GbpError("Unsupported archive format %s, unable to "
432                                "determine prefix for '%s'" %
433                                (self._archive_fmt, self.path))
434             # Determine prefix from the archive content
435             topdir_files = self._get_topdir_files(files)
436             if len(topdir_files) == 1:
437                 typ, name = topdir_files.pop()
438                 if typ == 'd':
439                     self._prefix = name
440
441     @property
442     def archive_fmt(self):
443         """Archive format of the sources, e.g. 'tar'"""
444         return self._archive_fmt
445
446     @property
447     def compression(self):
448         """Compression format of the sources, e.g. 'gzip'"""
449         return self._compression
450
451     @property
452     def prefix(self):
453         """Prefix, i.e. the 'leading directory name' of the sources"""
454         return self._prefix
455
456     def unpack(self, dir, filters=[]):
457         """
458         Unpack packed upstream sources into a given directory
459         and determine the toplevel of the source tree.
460         """
461         if self.is_dir():
462             raise GbpError("Cannot unpack directory %s" % self.path)
463
464         if not filters:
465             filters = []
466
467         if type(filters) != type([]):
468             raise GbpError("Filters must be a list")
469
470         if self._unpack_archive(dir, filters):
471             ret = type(self)(dir, prefix=self._prefix)
472         else:
473             ret = self
474         src_dir = os.path.join(dir, self._prefix)
475         ret.unpacked = src_dir if os.path.isdir(src_dir) else dir
476         return ret
477
478     def _unpack_archive(self, dir, filters):
479         """
480         Unpack packed upstream sources into a given directory. Return True if
481         the output was filtered, otherwise False.
482         """
483         ext = os.path.splitext(self.path)[1]
484         if ext in [ ".zip", ".xpi" ]:
485             self._unpack_zip(dir)
486         else:
487             self._unpack_tar(dir, filters)
488             if filters:
489                 return True
490         return False
491
492     def _unpack_zip(self, dir):
493         try:
494             gbpc.UnpackZipArchive(self.path, dir)()
495         except gbpc.CommandExecFailed:
496             raise GbpError("Unpacking of %s failed" % self.path)
497
498     def _unpack_tar(self, dir, filters):
499         """
500         Unpack a tarball to I{dir} applying a list of I{filters}. Leave the
501         cleanup to the caller in case of an error.
502         """
503         try:
504             unpackArchive = gbpc.UnpackTarArchive(self.path, dir, filters)
505             unpackArchive()
506         except gbpc.CommandExecFailed:
507             # unpackArchive already printed an error message
508             raise GbpError
509
510     def pack(self, newarchive, filters=[], newprefix=None):
511         """
512         Recreate a new archive from the current one
513
514         @param newarchive: the name of the new archive
515         @type newarchive: string
516         @param filters: tar filters to apply
517         @type filters: array of strings
518         @param newprefix: new prefix, None implies that prefix is not mangled
519         @type newprefix: string or None
520         @return: the new upstream source
521         @rtype: UpstreamSource
522         """
523         if not self.unpacked:
524             raise GbpError("Need an unpacked source tree to pack")
525
526         if not filters:
527             filters = []
528
529         if type(filters) != type([]):
530             raise GbpError("Filters must be a list")
531
532         run_dir = os.path.dirname(self.unpacked.rstrip('/'))
533         pack_this = os.path.basename(self.unpacked.rstrip('/'))
534         transform = None
535         if newprefix is not None:
536             newprefix = newprefix.strip('/.')
537             if newprefix:
538                 transform = 's!%s!%s!' % (pack_this, newprefix)
539             else:
540                 transform = 's!%s!%s!' % (pack_this, '.')
541         try:
542             repackArchive = gbpc.PackTarArchive(newarchive,
543                                                 run_dir,
544                                                 pack_this,
545                                                 filters,
546                                                 transform=transform)
547             repackArchive()
548         except gbpc.CommandExecFailed:
549             # repackArchive already printed an error
550             raise GbpError
551         new = type(self)(newarchive)
552         # Reuse the same unpacked dir if the content matches
553         if not filters:
554             new.unpacked = self.unpacked
555         return new
556
557     @staticmethod
558     def known_compressions():
559         return [ args[1][-1] for args in compressor_opts.items() ]
560
561     def guess_version(self, extra_regex=r''):
562         return self._pkg_policy.guess_upstream_src_version(self.path,
563                                                            extra_regex)