1 from __future__ import with_statement
5 from rpm._rpm import ts as TransactionSetCore
7 if sys.version_info[0] == 3:
10 _string_types = basestring,
13 # TODO: migrate relevant documentation from C-side
14 class TransactionSet(TransactionSetCore):
17 def _wrapSetGet(self, attr, val):
18 oval = getattr(self, attr)
19 setattr(self, attr, val)
22 def setVSFlags(self, flags):
23 return self._wrapSetGet('_vsflags', flags)
28 def setColor(self, color):
29 return self._wrapSetGet('_color', color)
31 def setPrefColor(self, color):
32 return self._wrapSetGet('_prefcolor', color)
34 def setFlags(self, flags):
35 return self._wrapSetGet('_flags', flags)
37 def setProbFilter(self, ignoreSet):
38 return self._wrapSetGet('_probFilter', ignoreSet)
40 def parseSpec(self, specfile):
42 return rpm._rpmb.spec(specfile)
48 # Backwards compatibility goo - WTH does this return a *tuple* ?!
54 def _f2hdr(self, item):
55 if isinstance(item, _string_types):
57 header = self.hdrFromFdno(f)
58 elif isinstance(item, rpm.hdr):
61 header = self.hdrFromFdno(item)
64 def addInstall(self, item, key, how="u"):
65 header = self._f2hdr(item)
67 if how not in ['u', 'i']:
68 raise ValueError('how argument must be "u" or "i"')
69 upgrade = (how == "u")
71 if not TransactionSetCore.addInstall(self, header, key, upgrade):
72 raise rpm.error("adding package to transaction failed")
74 def addReinstall(self, item, key):
75 header = self._f2hdr(item)
77 if not TransactionSetCore.addReinstall(self, header, key):
78 raise rpm.error("adding package to transaction failed")
80 def addErase(self, item):
82 if isinstance(item, rpm.hdr):
84 elif isinstance(item, rpm.mi):
86 elif isinstance(item, int):
87 hdrs = self.dbMatch(rpm.RPMDBI_PACKAGES, item)
88 elif isinstance(item, _string_types):
89 hdrs = self.dbMatch(rpm.RPMDBI_LABEL, item)
91 raise TypeError("invalid type %s" % type(item))
94 if not TransactionSetCore.addErase(self, h):
95 raise rpm.error("package not installed")
97 # garbage collection should take care but just in case...
98 if isinstance(hdrs, rpm.mi):
101 def run(self, callback, data):
102 rc = TransactionSetCore.run(self, callback, data, self._probFilter)
104 # crazy backwards compatibility goo: None for ok, list of problems
105 # if transaction didn't complete and empty list if it completed
112 for prob in self.problems():
113 item = ("%s" % prob, (prob.type, prob._str, prob._num))
117 def check(self, *args, **kwds):
118 TransactionSetCore.check(self, *args, **kwds)
120 # compatibility: munge problem strings into dependency tuples of doom
122 for p in self.problems():
123 # is it anything we need to care about?
124 if p.type == rpm.RPMPROB_CONFLICT:
125 sense = rpm.RPMDEP_SENSE_CONFLICTS
126 elif p.type == rpm.RPMPROB_REQUIRES:
127 sense = rpm.RPMDEP_SENSE_REQUIRES
131 # strip arch, split to name, version, release
132 nevr = p.altNEVR.rsplit('.', 1)[0]
133 n, v, r = nevr.rsplit('-', 2)
135 # extract the dependency information
136 needs = p._str.split()
138 needflags = rpm.RPMSENSE_ANY
142 needflags |= rpm.RPMSENSE_LESS
144 needflags |= rpm.RPMSENSE_EQUAL
146 needflags |= rpm.RPMSENSE_GREATER
151 res.append(((n, v, r),
152 (needname, needver), needflags, sense, p.key))
156 def hdrCheck(self, blob):
157 res, msg = TransactionSetCore.hdrCheck(self, blob)
158 # generate backwards compatibly broken exceptions
159 if res == rpm.RPMRC_NOKEY:
160 raise rpm.error("public key not available")
161 elif res == rpm.RPMRC_NOTTRUSTED:
162 raise rpm.error("public key not trusted")
163 elif res != rpm.RPMRC_OK:
166 def hdrFromFdno(self, fd):
167 res, h = TransactionSetCore.hdrFromFdno(self, fd)
168 # generate backwards compatibly broken exceptions
169 if res == rpm.RPMRC_NOKEY:
170 raise rpm.error("public key not available")
171 elif res == rpm.RPMRC_NOTTRUSTED:
172 raise rpm.error("public key not trusted")
173 elif res != rpm.RPMRC_OK:
174 raise rpm.error("error reading package header")