import tempfile
import stat
import fcntl
-
+import subprocess
from yum import misc, Errors, to_unicode
from yum.sqlutils import executeSQL
pass
from utils import _gzipOpen, bzipFile, checkAndMakeDir, GzipFile, \
- checksum_and_rename
+ checksum_and_rename, split_list_into_equal_chunks
import deltarpms
__version__ = '0.9.8'
self.read_pkgs_list = None # filepath/name to write out list of pkgs
# read in this run of createrepo
self.collapse_glibc_requires = True
-
+ self.workers = 1 # number of workers to fork off to grab metadata from the pkgs
+ #self.worker_cmd = '/usr/share/createrepo_worker.py'
+ self.worker_cmd = './worker.py' # fixme - need a nice way to make this be the right place :(
+
class SimpleMDCallBack(object):
def errorlog(self, thing):
print >> sys.stderr, thing
else:
rpmfile = '%s/%s' % (pkgpath, rpmfile)
+ external_data = { '_cachedir': self.conf.cachedir,
+ '_baseurl': baseurl,
+ '_reldir': reldir,
+ '_packagenumber': self.current_pkg,
+ '_collapse_libc_requires':self.conf.collapse_glibc_requires,
+ }
+
try:
po = yumbased.CreateRepoPackage(self.ts, rpmfile,
- sumtype=self.conf.sumtype)
+ sumtype=self.conf.sumtype,
+ external_data = external_data)
except Errors.MiscError, e:
raise MDError, "Unable to open package: %s" % e
- # external info we need
- po._cachedir = self.conf.cachedir
- po._baseurl = baseurl
- po._reldir = reldir
- po._packagenumber = self.current_pkg
- po._collapse_libc_requires = self.conf.collapse_glibc_requires
+
for r in po.requires_print:
if r.startswith('rpmlib('):
self.rpmlib_reqs[r] = 1
else:
directory = pkgpath
- for pkg in pkglist:
- self.current_pkg += 1
- recycled = False
+ # for worker/forked model
+ # iterate the pkglist - see which ones are handled by --update and let them
+ # go on their merry way
+
+ newpkgs = []
+ if self.conf.update:
+ # if we're in --update mode then only act on the new/changed pkgs
+ for pkg in pkglist:
+ self.current_pkg += 1
- # look to see if we can get the data from the old repodata
- # if so write this one out that way
- if self.conf.update:
#see if we can pull the nodes from the old repo
#print self.oldData.basenodes.keys()
old_pkg = pkg
if pkg.find("://") != -1:
old_pkg = os.path.basename(pkg)
nodes = self.oldData.getNodes(old_pkg)
- if nodes is not None:
- recycled = True
-
- # FIXME also open up the delta file
-
- # otherwise do it individually
- if not recycled:
- #scan rpm files
- if not pkgpath:
- reldir = os.path.join(self.conf.basedir, directory)
- else:
- reldir = pkgpath
-
- if not isinstance(pkg, YumAvailablePackage):
-
- try:
- po = self.read_in_package(pkg, pkgpath=pkgpath,
- reldir=reldir)
- except MDError, e:
- # need to say something here
- self.callback.errorlog("\nError %s: %s\n" % (pkg, e))
- continue
- # we can use deltas:
- if self.conf.deltas:
- self._do_delta_rpm_package(po)
- self.read_pkgs.append(pkg)
+ if nodes is not None: # we have a match in the old metadata
+ if self.conf.verbose:
+ self.callback.log(_("Using data from old metadata for %s")
+ % pkg)
+ (primarynode, filenode, othernode) = nodes
+
+ for node, outfile in ((primarynode, self.primaryfile),
+ (filenode, self.flfile),
+ (othernode, self.otherfile)):
+ if node is None:
+ break
+
+ if self.conf.baseurl:
+ anode = node.children
+ while anode is not None:
+ if anode.type != "element":
+ anode = anode.next
+ continue
+ if anode.name == "location":
+ anode.setProp('xml:base', self.conf.baseurl)
+ anode = anode.next
+ output = node.serialize('UTF-8', self.conf.pretty)
+ if output:
+ outfile.write(output)
+ else:
+ if self.conf.verbose:
+ self.callback.log(_("empty serialize on write to" \
+ "%s in %s") % (outfile, pkg))
+ outfile.write('\n')
+
+ self.oldData.freeNodes(pkg)
+ #FIXME - if we're in update and we have deltas enabled
+ # check the presto data for this pkg and write its info back out
+ # to our deltafile
+ continue
else:
- po = pkg
- if isinstance(pkg, YumLocalPackage):
- self.read_pkgs.append(po.localpath)
+ newpkgs.append(pkg)
+ else:
+ newpkgs = pkglist
- if self.conf.database_only:
- pass # disabled right now for sanity reasons (mine)
- #po.do_sqlite_dump(self.md_sqlite)
- else:
- self.primaryfile.write(po.xml_dump_primary_metadata())
- self.flfile.write(po.xml_dump_filelists_metadata())
- self.otherfile.write(po.xml_dump_other_metadata(
- clog_limit=self.conf.changelog_limit))
+ # setup our reldir
+ if not pkgpath:
+ reldir = os.path.join(self.conf.basedir, directory)
+ else:
+ reldir = pkgpath
+
+ # filter out those pkgs which are not files - but are pkgobjects
+ pkgfiles = []
+ for pkg in newpkgs:
+ if isinstance(pkg, YumAvailablePackage):
+ po = pkg
+ self.read_pkgs.append(po.localpath)
+ self.primaryfile.write(po.xml_dump_primary_metadata())
+ self.flfile.write(po.xml_dump_filelists_metadata())
+ self.otherfile.write(po.xml_dump_other_metadata(
+ clog_limit=self.conf.changelog_limit))
+ continue
+
+
+ pkgfiles.append(pkg)
+
+
+ # divide that list by the number of workers and fork off that many
+
+ # workers to tmpdirs
+ # waitfor the workers to finish and as each one comes in
+ # open the files they created and write them out to our metadata
+ # add up the total pkg counts and return that value
+ worker_tmp_path = tempfile.mkdtemp()
+ worker_chunks = utils.split_list_into_equal_chunks(pkgfiles, self.conf.workers)
+ worker_cmd_dict = {}
+ worker_jobs = []
+ base_worker_cmdline = [self.conf.worker_cmd,
+ '--pkgoptions=_reldir=%s' % reldir,
+ '--pkgoptions=_collapse_libc_requires=%s' % self.conf.collapse_glibc_requires,
+ '--pkgoptions=_cachedir=%s' % self.conf.cachedir,
+ '--pkgoptions=_baseurl=%s' % self.conf.baseurl,]
+
+
+ for worker_num in range(self.conf.workers):
+ # make the worker directory
+ workercmdline = []
+ workercmdline.extend(base_worker_cmdline)
+ thisdir = worker_tmp_path + '/' + str(worker_num)
+ if checkAndMakeDir(thisdir):
+ workercmdline.append('--tmpmdpath=%s' % thisdir)
else:
- if self.conf.verbose:
- self.callback.log(_("Using data from old metadata for %s")
- % pkg)
- (primarynode, filenode, othernode) = nodes
-
- for node, outfile in ((primarynode, self.primaryfile),
- (filenode, self.flfile),
- (othernode, self.otherfile)):
- if node is None:
- break
-
- if self.conf.baseurl:
- anode = node.children
- while anode is not None:
- if anode.type != "element":
- anode = anode.next
- continue
- if anode.name == "location":
- anode.setProp('xml:base', self.conf.baseurl)
- anode = anode.next
-
- output = node.serialize('UTF-8', self.conf.pretty)
- if output:
- outfile.write(output)
- else:
- if self.conf.verbose:
- self.callback.log(_("empty serialize on write to" \
- "%s in %s") % (outfile, pkg))
- outfile.write('\n')
+ raise MDError, "Unable to create worker path: %s" % thisdir
+ workercmdline.extend(worker_chunks[worker_num])
+ worker_cmd_dict[worker_num] = workercmdline
+
+
- self.oldData.freeNodes(pkg)
- #FIXME - if we're in update and we have deltas enabled
- # check the presto data for this pkg and write its info back out
- # to our deltafile
+ for (num, cmdline) in worker_cmd_dict.items():
+ self.callback.log("Spawning worker %s with %s pkgs" % (num, len(worker_chunks[num])))
+ job = subprocess.Popen(cmdline) # fixme - add stdout/stderr PIPES here,
+ worker_jobs.append(job)
+
+ for job in worker_jobs:
+ # fixme - need 'communicate' to see about getting info back
+ os.waitpid(job.pid, 0)
+
+ self.callback.log("Workers Finished")
+ # finished with workers
+ # go to their dirs and add the contents
+ self.callback.log("Gathering worker results")
+ for num in range(self.conf.workers):
+ for (fn, fo) in (('primary.xml', self.primaryfile),
+ ('filelists.xml', self.flfile),
+ ('other.xml', self.otherfile)):
+ fnpath = worker_tmp_path + '/' + str(num) + '/' + fn
+ if os.path.exists(fnpath):
+ fo.write(open(fnpath, 'r').read())
- if not self.conf.quiet:
- if self.conf.verbose:
- self.callback.log('%d/%d - %s' % (self.current_pkg,
- self.pkgcount, pkg))
- else:
- self.callback.progress(pkg, self.current_pkg, self.pkgcount)
+
+ for pkgfile in pkgfiles:
+ if self.conf.deltas:
+ po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+ self._do_delta_rpm_package(po)
+ self.read_pkgs.append(pkgfile)
return self.current_pkg
--- /dev/null
+#!/usr/bin/python -tt
+
+import sys
+import yum
+import createrepo
+import os
+import rpmUtils
+from optparse import OptionParser
+
+
+# pass in dir to make tempdirs in
+# make tempdir for this worker
+# create 3 files in that tempdir
+# return how many pkgs
+# return on stderr where things went to hell
+
+#TODO - take most of read_in_package from createrepo and duplicate it here
+# so we can do downloads, etc.
+# then replace callers of read_in_package with forked callers of this
+# and reassemble at the end
+
+def main(args):
+ parser = OptionParser()
+ parser.add_option('--tmpmdpath', default=None,
+ help="path where the outputs should be dumped for this worker")
+ parser.add_option("--pkgoptions", default=[], action='append',
+ help="pkgoptions in the format of key=value")
+
+
+ opts, pkgs = parser.parse_args(args)
+ external_data = {'_packagenumber': 0}
+ if not opts.tmpmdpath:
+ print >> sys.stderr, "tmpmdpath required for destination files"
+ sys.exit(1)
+
+ for strs in opts.pkgoptions:
+ k,v = strs.split('=')
+ if v in ['True', 'true', 'yes', '1', 1]:
+ v = True
+ elif v in ['False', 'false', 'no', '0', 0]:
+ v = False
+ elif v in ['None', 'none', '']:
+ v = None
+ external_data[k] = v
+
+
+ reldir = external_data['_reldir']
+ ts = rpmUtils.transaction.initReadOnlyTransaction()
+ pri = open(opts.tmpmdpath + '/primary.xml' , 'w')
+ fl = open(opts.tmpmdpath + '/filelists.xml' , 'w')
+ other = open(opts.tmpmdpath + '/other.xml' , 'w')
+
+ for pkgfile in pkgs:
+ pkgpath = reldir + '/' + pkgfile
+ if not os.path.exists(pkgpath):
+ continue
+
+ try:
+ pkg = createrepo.yumbased.CreateRepoPackage(ts, package=pkgpath,
+ external_data=external_data)
+ pri.write(pkg.xml_dump_primary_metadata())
+ fl.write(pkg.xml_dump_filelists_metadata())
+ other.write(pkg.xml_dump_other_metadata())
+ except yum.Errors.YumBaseError, e:
+ print >> sys.stderr, "Error: %s" % e
+ continue
+ else:
+ external_data['_packagenumber']+=1
+
+ pri.close()
+ fl.close()
+ other.close()
+ print external_data['_packagenumber']
+
+if __name__ == "__main__":
+ main(sys.argv[1:])