Ignore the pylint warning of raising-bad-type and unbalanced-tuple-unpacking in conne...
[tools/mic.git] / mic / utils / rpmmisc.py
1 #!/usr/bin/python3 -tt
2 #
3 # Copyright (c) 2008, 2009, 2010, 2011 Intel, Inc.
4 #
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation; version 2 of the License
8 #
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12 # for more details.
13 #
14 # You should have received a copy of the GNU General Public License along
15 # with this program; if not, write to the Free Software Foundation, Inc., 59
16 # Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17
18 import os
19 import sys
20 import re
21 import rpm
22
23 from mic import msger
24 from mic.utils.errors import CreatorError
25 from mic.utils.proxy import get_proxy_for
26 from mic.utils import runner
27
28
29 class RPMInstallCallback:
30     """ Command line callback class for callbacks from the RPM library.
31     """
32
33     def __init__(self, ts, output=1):
34         self.output = output
35         self.callbackfilehandles = {}
36         self.total_actions = 0
37         self.total_installed = 0
38         self.total_installing = 0
39         self.installed_pkg_names = []
40         self.total_removed = 0
41         self.mark = "+"
42         self.marks = 40
43         self.lastmsg = None
44         self.tsInfo = None # this needs to be set for anything else to work
45         self.ts = ts
46         self.filelog = False
47         self.logString = []
48         self.headmsg = "Installing"
49
50     def _dopkgtup(self, hdr):
51         tmpepoch = hdr['epoch']
52         if tmpepoch is None: epoch = '0'
53         else: epoch = str(tmpepoch)
54         #name, version, release must have value according to spec rule.
55         return (hdr['name'], hdr['arch'], epoch, hdr['version'], hdr['release'])
56
57     def _makeHandle(self, hdr):
58         tmpepoch = hdr['epoch']
59         if tmpepoch is None: epoch = '0'
60         else: epoch = str(tmpepoch) #epoch is 'int' object
61         handle = '%s:%s.%s-%s-%s' % (tmpepoch, hdr['name'], hdr['version'], hdr['release'], hdr['arch'])
62
63         return handle
64
65     def _localprint(self, msg):
66         if self.output:
67             msger.info(msg)
68
69     def _makefmt(self, percent, progress = True):
70         l = len(str(self.total_actions))
71         size = "%s.%s" % (l, l)
72         fmt_done = "[%" + size + "s/%" + size + "s]"
73         done = fmt_done % (self.total_installing,
74                            self.total_actions)
75         marks = self.marks - (2 * l)
76         width = "%s.%s" % (marks, marks)
77         fmt_bar = "%-" + width + "s"
78         if progress:
79             bar = fmt_bar % (self.mark * int(marks * (percent / 100.0)), )
80             fmt = "%-10.10s: %-50.50s " + bar + " " + done
81         else:
82             bar = fmt_bar % (self.mark * marks, )
83             fmt = "%-10.10s: %-50.50s "  + bar + " " + done
84         return fmt
85
86     def _logPkgString(self, hdr):
87         """return nice representation of the package for the log"""
88         (n,a,e,v,r) = self._dopkgtup(hdr)
89         if e == '0':
90             pkg = '%s.%s %s-%s' % (n, a, v, r)
91         else:
92             pkg = '%s.%s %s:%s-%s' % (n, a, e, v, r)
93
94         return pkg
95
96     def callback(self, what, bytes, total, h, user):
97         if what == rpm.RPMCALLBACK_TRANS_START:
98             if bytes == 6:
99                 self.total_actions = total
100
101         elif what == rpm.RPMCALLBACK_TRANS_PROGRESS:
102             pass
103
104         elif what == rpm.RPMCALLBACK_TRANS_STOP:
105             pass
106
107         elif what == rpm.RPMCALLBACK_INST_OPEN_FILE:
108             self.lastmsg = None
109             hdr = None
110             if h is not None:
111                 try:
112                     hdr, rpmloc = h
113                 except:
114                     rpmloc = h
115                     hdr = readRpmHeader(self.ts, h)
116
117                 m = re.match("(.*)-(\d+.*)-(\d+\.\d+)\.(.+)\.rpm", os.path.basename(rpmloc))
118                 if m:
119                     pkgname = m.group(1)
120                 else:
121                     pkgname = os.path.basename(rpmloc)
122                 msger.info("Next install: %s " % pkgname)
123
124                 handle = self._makeHandle(hdr)
125                 fd = os.open(rpmloc, os.O_RDONLY)
126                 self.callbackfilehandles[handle]=fd
127                 if hdr['name'] not in self.installed_pkg_names:
128                     self.installed_pkg_names.append(hdr['name'])
129                     self.total_installed += 1
130                 return fd
131             else:
132                 self._localprint("No header - huh?")
133
134         elif what == rpm.RPMCALLBACK_INST_CLOSE_FILE:
135             hdr = None
136             if h is not None:
137                 try:
138                     hdr, rpmloc = h
139                 except:
140                     rpmloc = h
141                     hdr = readRpmHeader(self.ts, h)
142
143                 handle = self._makeHandle(hdr)
144                 os.close(self.callbackfilehandles[handle])
145                 fd = 0
146
147                 # log stuff
148                 #pkgtup = self._dopkgtup(hdr)
149                 self.logString.append(self._logPkgString(hdr))
150
151         elif what == rpm.RPMCALLBACK_INST_START:
152             self.total_installing += 1
153
154         elif what == rpm.RPMCALLBACK_UNINST_STOP:
155             pass
156
157         elif what == rpm.RPMCALLBACK_INST_PROGRESS:
158             if h is not None:
159                 percent = (self.total_installed*100)/self.total_actions
160                 if total > 0:
161                     try:
162                         hdr, rpmloc = h
163                     except:
164                         rpmloc = h
165
166                     m = re.match("(.*)-(\d+.*)-(\d+\.\d+)\.(.+)\.rpm", os.path.basename(rpmloc))
167                     if m:
168                         pkgname = m.group(1)
169                     else:
170                         pkgname = os.path.basename(rpmloc)
171                 if self.output:
172                     fmt = self._makefmt(percent)
173                     msg = fmt % (self.headmsg, pkgname)
174                     if msg != self.lastmsg:
175                         self.lastmsg = msg
176
177                         msger.info(msg)
178
179                         if self.total_installed == self.total_actions:
180                             msger.raw('')
181                             msger.verbose('\n'.join(self.logString))
182
183         elif what == rpm.RPMCALLBACK_UNINST_START:
184             pass
185
186         elif what == rpm.RPMCALLBACK_UNINST_PROGRESS:
187             pass
188
189         elif what == rpm.RPMCALLBACK_UNINST_STOP:
190             self.total_removed += 1
191
192         elif what == rpm.RPMCALLBACK_REPACKAGE_START:
193             pass
194
195         elif what == rpm.RPMCALLBACK_REPACKAGE_STOP:
196             pass
197
198         elif what == rpm.RPMCALLBACK_REPACKAGE_PROGRESS:
199             pass
200         elif what == rpm.RPMCALLBACK_SCRIPT_ERROR:
201             if h is not None:
202                 try:
203                     hdr, rpmloc = h
204                 except:
205                     rpmloc = h
206
207                 m = re.match("(.*)-(\d+.*)-(\d+\.\d+)\.(.+)\.rpm", os.path.basename(rpmloc))
208                 if m:
209                     pkgname = m.group(1)
210                 else:
211                     pkgname = os.path.basename(rpmloc)
212
213                 msger.warning('(%s) Post script failed' % pkgname)
214
215 def readRpmHeader(ts, filename):
216     """ Read an rpm header. """
217
218     fd = os.open(filename, os.O_RDONLY)
219     h = ts.hdrFromFdno(fd)
220     os.close(fd)
221     return h
222
223 def splitFilename(filename):
224     """ Pass in a standard style rpm fullname
225
226         Return a name, version, release, epoch, arch, e.g.::
227             foo-1.0-1.i386.rpm returns foo, 1.0, 1, i386
228             1:bar-9-123a.ia64.rpm returns bar, 9, 123a, 1, ia64
229     """
230
231     if filename[-4:] == '.rpm':
232         filename = filename[:-4]
233
234     archIndex = filename.rfind('.')
235     arch = filename[archIndex+1:]
236
237     relIndex = filename[:archIndex].rfind('-')
238     rel = filename[relIndex+1:archIndex]
239
240     verIndex = filename[:relIndex].rfind('-')
241     ver = filename[verIndex+1:relIndex]
242
243     epochIndex = filename.find(':')
244     if epochIndex == -1:
245         epoch = ''
246     else:
247         epoch = filename[:epochIndex]
248
249     name = filename[epochIndex + 1:verIndex]
250     return name, ver, rel, epoch, arch
251
252 def getCanonX86Arch(arch):
253     #
254     if arch == "i586":
255         f = open("/proc/cpuinfo", "r")
256         lines = f.readlines()
257         f.close()
258         for line in lines:
259             if line.startswith("model name") and line.find("Geode(TM)") != -1:
260                 return "geode"
261         return arch
262     # only athlon vs i686 isn't handled with uname currently
263     if arch != "i686":
264         return arch
265
266     # if we're i686 and AuthenticAMD, then we should be an athlon
267     f = open("/proc/cpuinfo", "r")
268     lines = f.readlines()
269     f.close()
270     for line in lines:
271         if line.startswith("vendor") and line.find("AuthenticAMD") != -1:
272             return "athlon"
273         # i686 doesn't guarantee cmov, but we depend on it
274         elif line.startswith("flags") and line.find("cmov") == -1:
275             return "i586"
276
277     return arch
278
279 def getCanonX86_64Arch(arch):
280     if arch != "x86_64":
281         return arch
282
283     vendor = None
284     f = open("/proc/cpuinfo", "r")
285     lines = f.readlines()
286     f.close()
287     for line in lines:
288         if line.startswith("vendor_id"):
289             vendor = line.split(':')[1]
290             break
291     if vendor is None:
292         return arch
293
294     if vendor.find("Authentic AMD") != -1 or vendor.find("AuthenticAMD") != -1:
295         return "amd64"
296     if vendor.find("GenuineIntel") != -1:
297         return "ia32e"
298     return arch
299
300 def getCanonArch():
301     arch = os.uname()[4]
302
303     if (len(arch) == 4 and arch[0] == "i" and arch[2:4] == "86"):
304         return getCanonX86Arch(arch)
305
306     if arch == "x86_64":
307         return getCanonX86_64Arch(arch)
308
309     return arch
310
311 # Copy from libsatsolver:poolarch.c, with cleanup
312 archPolicies = {
313     "x86_64":       "x86_64:i686:i586:i486:i386",
314     "i686":         "i686:i586:i486:i386",
315     "i586":         "i586:i486:i386",
316     "ia64":         "ia64:i686:i586:i486:i386",
317     "aarch64":      "aarch64",
318     "armv7tnhl":    "armv7tnhl:armv7thl:armv7nhl:armv7hl",
319     "armv7thl":     "armv7thl:armv7hl",
320     "armv7nhl":     "armv7nhl:armv7hl",
321     "armv7hl":      "armv7hl",
322     "armv7l":       "armv7l:armv6l:armv5tejl:armv5tel:armv5l:armv4tl:armv4l:armv3l",
323     "armv6l":       "armv6l:armv5tejl:armv5tel:armv5l:armv4tl:armv4l:armv3l",
324     "armv5tejl":    "armv5tejl:armv5tel:armv5l:armv4tl:armv4l:armv3l",
325     "armv5tel":     "armv5tel:armv5l:armv4tl:armv4l:armv3l",
326     "armv5l":       "armv5l:armv4tl:armv4l:armv3l",
327     "mipsel":       "mipsel",
328     "riscv64":      "riscv64",
329 }
330
331 # dict mapping arch -> ( multicompat, best personality, biarch personality )
332 multilibArches = {
333     "x86_64":  ( "athlon", "x86_64", "athlon" ),
334 }
335
336 # from yumUtils.py
337 arches = {
338     # ia32
339     "athlon": "i686",
340     "i686": "i586",
341     "geode": "i586",
342     "i586": "i486",
343     "i486": "i386",
344     "i386": "noarch",
345
346     # amd64
347     "x86_64": "athlon",
348     "amd64": "x86_64",
349     "ia32e": "x86_64",
350
351     # arm
352     "armv7tnhl": "armv7nhl",
353     "armv7nhl": "armv7hl",
354     "armv7hl": "noarch",
355     "armv7l": "armv6l",
356     "armv6l": "armv5tejl",
357     "armv5tejl": "armv5tel",
358     "armv5tel": "noarch",
359
360     #itanium
361     "ia64": "noarch",
362
363     "mipsel": "mipsel",
364
365     "riscv64": "noarch",
366 }
367
368 def isMultiLibArch(arch=None):
369     """returns true if arch is a multilib arch, false if not"""
370     if arch is None:
371         arch = getCanonArch()
372
373     if arch not in arches: # or we could check if it is noarch
374         return False
375
376     if arch in multilibArches:
377         return True
378
379     if arches[arch] in multilibArches:
380         return True
381
382     return False
383
384 def getBaseArch():
385     myarch = getCanonArch()
386     if myarch not in arches:
387         return myarch
388
389     if isMultiLibArch(arch=myarch):
390         if myarch in multilibArches:
391             return myarch
392         else:
393             return arches[myarch]
394
395     if myarch in arches:
396         basearch = myarch
397         value = arches[basearch]
398         while value != 'noarch':
399             basearch = value
400             value = arches[basearch]
401
402         return basearch
403
404 def checkRpmIntegrity(bin_rpm, package):
405     return runner.quiet([bin_rpm, "-K", "--nosignature", package])
406
407 def checkSig(ts, package):
408     """ Takes a transaction set and a package, check it's sigs,
409         return 0 if they are all fine
410         return 1 if the gpg key can't be found
411         return 2 if the header is in someway damaged
412         return 3 if the key is not trusted
413         return 4 if the pkg is not gpg or pgp signed
414     """
415
416     value = 0
417     currentflags = ts.setVSFlags(0)
418     fdno = os.open(package, os.O_RDONLY)
419     try:
420         hdr = ts.hdrFromFdno(fdno)
421
422     except rpm.error as e:
423         if str(e) == "public key not availaiable":
424             value = 1
425         if str(e) == "public key not available":
426             value = 1
427         if str(e) == "public key not trusted":
428             value = 3
429         if str(e) == "error reading package header":
430             value = 2
431     else:
432         error, siginfo = getSigInfo(hdr)
433         if error == 101:
434             os.close(fdno)
435             del hdr
436             value = 4
437         else:
438             del hdr
439
440     try:
441         os.close(fdno)
442     except OSError:
443         pass
444
445     ts.setVSFlags(currentflags) # put things back like they were before
446     return value
447
448 def getSigInfo(hdr):
449     """ checks signature from an hdr hand back signature information and/or
450         an error code
451     """
452
453     import locale
454     locale.setlocale(locale.LC_ALL, 'C')
455
456     string = '%|DSAHEADER?{%{DSAHEADER:pgpsig}}:{%|RSAHEADER?{%{RSAHEADER:pgpsig}}:{%|SIGGPG?{%{SIGGPG:pgpsig}}:{%|SIGPGP?{%{SIGPGP:pgpsig}}:{(none)}|}|}|}|'
457     siginfo = hdr.sprintf(string)
458     if siginfo != '(none)':
459         error = 0
460         sigtype, sigdate, sigid = siginfo.split(',')
461     else:
462         error = 101
463         sigtype = 'MD5'
464         sigdate = 'None'
465         sigid = 'None'
466
467     infotuple = (sigtype, sigdate, sigid)
468     return error, infotuple
469