create a worker script for createrepo so createrepo can
authorSeth Vidal <skvidal@fedoraproject.org>
Thu, 9 Sep 2010 21:07:06 +0000 (17:07 -0400)
committerSeth Vidal <skvidal@fedoraproject.org>
Thu, 9 Sep 2010 21:07:06 +0000 (17:07 -0400)
fork off N processes to handle the md gathering from pkgs.
This should speed up results on systems which have been cpubound
on the createrepo process.

If you're io bound it won't help you at all, and MAY make it worse.

many misc issues to iron out here - not the least of which is the
callback output and gathering stdout/stderr from the workers

Makefile
createrepo/__init__.py
createrepo/utils.py
createrepo/yumbased.py
genpkgmetadata.py
worker.py [new file with mode: 0755]

index 6b907d81a2b01026828fa5677659ff90c7e449fd..60bb9db4eed6c8378770f3a5e5308d297691cae5 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -47,7 +47,8 @@ RM              = rm -f
 
 MODULES = $(srcdir)/genpkgmetadata.py \
        $(srcdir)/modifyrepo.py \
-       $(srcdir)/mergerepo.py  
+       $(srcdir)/mergerepo.py  \
+       $(srcdir)/worker.py
 
 .SUFFIXES: .py .pyc
 .py.pyc: 
index e06da99dab24893c9279df7407aa3005e231cca1..b2937c9e0e521517f30ca81767469c8b26410f47 100644 (file)
@@ -26,7 +26,7 @@ from urlgrabber import grabber
 import tempfile
 import stat
 import fcntl
-
+import subprocess
 
 from yum import misc, Errors, to_unicode
 from yum.sqlutils import executeSQL
@@ -47,7 +47,7 @@ except ImportError:
     pass
 
 from utils import _gzipOpen, bzipFile, checkAndMakeDir, GzipFile, \
-                  checksum_and_rename
+                  checksum_and_rename, split_list_into_equal_chunks
 import deltarpms
 
 __version__ = '0.9.8'
@@ -106,7 +106,10 @@ class MetaDataConfig(object):
         self.read_pkgs_list = None # filepath/name to write out list of pkgs
                                    # read in this run of createrepo
         self.collapse_glibc_requires = True
-
+        self.workers = 1 # number of workers to fork off to grab metadata from the pkgs
+        #self.worker_cmd = '/usr/share/createrepo_worker.py'
+        self.worker_cmd = './worker.py' # fixme - need a nice way to make this be the right place :(
+        
 class SimpleMDCallBack(object):
     def errorlog(self, thing):
         print >> sys.stderr, thing
@@ -477,17 +480,20 @@ class MetaDataGenerator:
         else:
             rpmfile = '%s/%s' % (pkgpath, rpmfile)
 
+        external_data = { '_cachedir': self.conf.cachedir,
+                          '_baseurl': baseurl,
+                          '_reldir': reldir,
+                          '_packagenumber': self.current_pkg,
+                          '_collapse_libc_requires':self.conf.collapse_glibc_requires,
+                          }
+                        
         try:
             po = yumbased.CreateRepoPackage(self.ts, rpmfile,
-                                            sumtype=self.conf.sumtype)
+                                            sumtype=self.conf.sumtype,
+                                            external_data = external_data)
         except Errors.MiscError, e:
             raise MDError, "Unable to open package: %s" % e
-        # external info we need
-        po._cachedir = self.conf.cachedir
-        po._baseurl = baseurl
-        po._reldir = reldir
-        po._packagenumber = self.current_pkg
-        po._collapse_libc_requires = self.conf.collapse_glibc_requires
+
         for r in po.requires_print:
             if r.startswith('rpmlib('):
                 self.rpmlib_reqs[r] = 1
@@ -508,101 +514,143 @@ class MetaDataGenerator:
         else:
             directory = pkgpath
 
-        for pkg in pkglist:
-            self.current_pkg += 1
-            recycled = False
+        # for worker/forked model
+        # iterate the pkglist - see which ones are handled by --update and let them
+        # go on their merry way
+        
+        newpkgs = []
+        if self.conf.update:
+            # if we're in --update mode then only act on the new/changed pkgs
+            for pkg in pkglist:
+                self.current_pkg += 1
 
-            # look to see if we can get the data from the old repodata
-            # if so write this one out that way
-            if self.conf.update:
                 #see if we can pull the nodes from the old repo
                 #print self.oldData.basenodes.keys()
                 old_pkg = pkg
                 if pkg.find("://") != -1:
                     old_pkg = os.path.basename(pkg)
                 nodes = self.oldData.getNodes(old_pkg)
-                if nodes is not None:
-                    recycled = True
-
-                # FIXME also open up the delta file
-
-            # otherwise do it individually
-            if not recycled:
-                #scan rpm files
-                if not pkgpath:
-                    reldir = os.path.join(self.conf.basedir, directory)
-                else:
-                    reldir = pkgpath
-
-                if not isinstance(pkg, YumAvailablePackage):
-
-                    try:
-                        po = self.read_in_package(pkg, pkgpath=pkgpath,
-                                                  reldir=reldir)
-                    except MDError, e:
-                        # need to say something here
-                        self.callback.errorlog("\nError %s: %s\n" % (pkg, e))
-                        continue
-                    # we can use deltas:
-                    if self.conf.deltas:
-                        self._do_delta_rpm_package(po)
-                    self.read_pkgs.append(pkg)
+                if nodes is not None: # we have a match in the old metadata
+                    if self.conf.verbose:
+                        self.callback.log(_("Using data from old metadata for %s")
+                                            % pkg)
+                    (primarynode, filenode, othernode) = nodes
+
+                    for node, outfile in ((primarynode, self.primaryfile),
+                                          (filenode, self.flfile),
+                                          (othernode, self.otherfile)):
+                        if node is None:
+                            break
+
+                        if self.conf.baseurl:
+                            anode = node.children
+                            while anode is not None:
+                                if anode.type != "element":
+                                    anode = anode.next
+                                    continue
+                                if anode.name == "location":
+                                    anode.setProp('xml:base', self.conf.baseurl)
+                                anode = anode.next
 
+                        output = node.serialize('UTF-8', self.conf.pretty)
+                        if output:
+                            outfile.write(output)
+                        else:
+                            if self.conf.verbose:
+                                self.callback.log(_("empty serialize on write to" \
+                                                    "%s in %s") % (outfile, pkg))
+                        outfile.write('\n')
+
+                    self.oldData.freeNodes(pkg)
+                    #FIXME - if we're in update and we have deltas enabled
+                    # check the presto data for this pkg and write its info back out
+                    # to our deltafile
+                    continue
                 else:
-                    po = pkg
-                    if isinstance(pkg, YumLocalPackage):
-                        self.read_pkgs.append(po.localpath)
+                    newpkgs.append(pkg)
+        else:
+            newpkgs = pkglist
 
-                if self.conf.database_only:
-                    pass # disabled right now for sanity reasons (mine)
-                    #po.do_sqlite_dump(self.md_sqlite)
-                else:
-                    self.primaryfile.write(po.xml_dump_primary_metadata())
-                    self.flfile.write(po.xml_dump_filelists_metadata())
-                    self.otherfile.write(po.xml_dump_other_metadata(
-                              clog_limit=self.conf.changelog_limit))
+        # setup our reldir
+        if not pkgpath:
+            reldir = os.path.join(self.conf.basedir, directory)
+        else:
+            reldir = pkgpath
+
+        # filter out those pkgs which are not files - but are pkgobjects
+        pkgfiles = []
+        for pkg in newpkgs:
+            if isinstance(pkg, YumAvailablePackage):
+                po = pkg
+                self.read_pkgs.append(po.localpath)
+                self.primaryfile.write(po.xml_dump_primary_metadata())
+                self.flfile.write(po.xml_dump_filelists_metadata())
+                self.otherfile.write(po.xml_dump_other_metadata(
+                                     clog_limit=self.conf.changelog_limit))
+                continue
+            
+                
+            pkgfiles.append(pkg)
+            
+       
+        # divide that list by the number of workers and fork off that many
+       
+        # workers to tmpdirs
+        # waitfor the workers to finish and as each one comes in
+        # open the files they created and write them out to our metadata
+        # add up the total pkg counts and return that value
+        worker_tmp_path = tempfile.mkdtemp()
+        worker_chunks = utils.split_list_into_equal_chunks(pkgfiles,  self.conf.workers)
+        worker_cmd_dict = {}
+        worker_jobs = []
+        base_worker_cmdline = [self.conf.worker_cmd, 
+                '--pkgoptions=_reldir=%s' % reldir,
+                '--pkgoptions=_collapse_libc_requires=%s' % self.conf.collapse_glibc_requires, 
+                '--pkgoptions=_cachedir=%s' % self.conf.cachedir,
+                '--pkgoptions=_baseurl=%s' % self.conf.baseurl,]
+                               
+                               
+        for worker_num in range(self.conf.workers):
+            # make the worker directory
+            workercmdline = []
+            workercmdline.extend(base_worker_cmdline)
+            thisdir = worker_tmp_path + '/' + str(worker_num)
+            if checkAndMakeDir(thisdir):
+                workercmdline.append('--tmpmdpath=%s' % thisdir)
             else:
-                if self.conf.verbose:
-                    self.callback.log(_("Using data from old metadata for %s")
-                                     % pkg)
-                (primarynode, filenode, othernode) = nodes
-
-                for node, outfile in ((primarynode, self.primaryfile),
-                                      (filenode, self.flfile),
-                                      (othernode, self.otherfile)):
-                    if node is None:
-                        break
-
-                    if self.conf.baseurl:
-                        anode = node.children
-                        while anode is not None:
-                            if anode.type != "element":
-                                anode = anode.next
-                                continue
-                            if anode.name == "location":
-                                anode.setProp('xml:base', self.conf.baseurl)
-                            anode = anode.next
-
-                    output = node.serialize('UTF-8', self.conf.pretty)
-                    if output:
-                        outfile.write(output)
-                    else:
-                        if self.conf.verbose:
-                            self.callback.log(_("empty serialize on write to" \
-                                                "%s in %s") % (outfile, pkg))
-                    outfile.write('\n')
+                raise MDError, "Unable to create worker path: %s" % thisdir
+            workercmdline.extend(worker_chunks[worker_num])
+            worker_cmd_dict[worker_num] = workercmdline
+        
+            
 
-                self.oldData.freeNodes(pkg)
-                #FIXME - if we're in update and we have deltas enabled
-                # check the presto data for this pkg and write its info back out
-                # to our deltafile
+        for (num, cmdline) in worker_cmd_dict.items():
+            self.callback.log("Spawning worker %s with %s pkgs" % (num, len(worker_chunks[num])))
+            job = subprocess.Popen(cmdline) # fixme - add stdout/stderr PIPES here,
+            worker_jobs.append(job)
+        
+        for job in worker_jobs:
+            # fixme - need 'communicate' to see about getting info back
+            os.waitpid(job.pid, 0)
+        
+        self.callback.log("Workers Finished")
+        # finished with workers
+        # go to their dirs and add the contents
+        self.callback.log("Gathering worker results")
+        for num in range(self.conf.workers):
+            for (fn, fo) in (('primary.xml', self.primaryfile), 
+                       ('filelists.xml', self.flfile),
+                       ('other.xml', self.otherfile)):
+                fnpath = worker_tmp_path + '/' + str(num) + '/' + fn
+                if os.path.exists(fnpath):
+                    fo.write(open(fnpath, 'r').read())
 
-            if not self.conf.quiet:
-                if self.conf.verbose:
-                    self.callback.log('%d/%d - %s' % (self.current_pkg,
-                                                      self.pkgcount, pkg))
-                else:
-                    self.callback.progress(pkg, self.current_pkg, self.pkgcount)
+                
+        for pkgfile in pkgfiles:
+            if self.conf.deltas:
+                po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir)
+                self._do_delta_rpm_package(po)
+            self.read_pkgs.append(pkgfile)
 
         return self.current_pkg
 
index 3fa077f7920ef6de7f5ca5453f97e2a370781a52..995c3b9b667fb7d5f1840e73722c08dc424d485f 100644 (file)
@@ -123,6 +123,15 @@ def encodefiletypelist(filetypelist):
         result += ftl[x]
     return result
 
+def split_list_into_equal_chunks(seq, num_chunks):
+    avg = len(seq) / float(num_chunks)
+    out = []
+    last = 0.0
+    while last < len(seq):
+        out.append(seq[int(last):int(last + avg)])
+        last += avg
+
+    return out
 
 
 class MDError(Exception):
index acb5851aaac76ee027da1295f8e1196f14cb5adb..ac061961408f0eedc7c6c560d19993f066cdfd42 100644 (file)
@@ -26,10 +26,15 @@ import utils
 import tempfile
 
 class CreateRepoPackage(YumLocalPackage):
-    def __init__(self, ts, package, sumtype=None):
+    def __init__(self, ts, package, sumtype=None, external_data={}):
         YumLocalPackage.__init__(self, ts, package)
         if sumtype:
             self.checksum_type = sumtype
+        
+        if external_data:
+            for (key, val) in external_data.items():
+                setattr(self, key, val)
+                
 
     def _do_checksum(self):
         """return a checksum for a package:
@@ -44,7 +49,7 @@ class CreateRepoPackage(YumLocalPackage):
             return self._checksum
 
         # not using the cachedir
-        if not self._cachedir:
+        if not hasattr(self, '_cachedir') or not self._cachedir:
             self._checksum = misc.checksum(self.checksum_type, self.localpath)
             self._checksums = [(self.checksum_type, self._checksum, 1)]
             return self._checksum
index 108c68f9c15e1d889b2a316348d2c0c3e29f3a47..8c981917297a446c1b3aaa74239275835b6301b6 100755 (executable)
@@ -119,6 +119,11 @@ def parse_args(args, conf):
     parser.add_option("--max-delta-rpm-size", default=100000000,
         dest='max_delta_rpm_size', type='int',
         help="max size of an rpm that to run deltarpm against (in bytes)")
+
+    parser.add_option("--workers", default=1,
+        dest='workers', type='int',
+        help="number of workers to spawn to read rpms")
+    
     (opts, argsleft) = parser.parse_args(args)
     if len(argsleft) > 1 and not opts.split:
         errorprint(_('Error: Only one directory allowed per run.'))
diff --git a/worker.py b/worker.py
new file mode 100755 (executable)
index 0000000..8c7e4c5
--- /dev/null
+++ b/worker.py
@@ -0,0 +1,76 @@
+#!/usr/bin/python -tt
+
+import sys
+import yum
+import createrepo
+import os
+import rpmUtils
+from optparse import OptionParser
+
+
+# pass in dir to make tempdirs in
+# make tempdir for this worker
+# create 3 files in that tempdir
+# return how many pkgs
+# return on stderr where things went to hell
+
+#TODO - take most of read_in_package from createrepo and duplicate it here
+# so we can do downloads, etc.
+# then replace callers of read_in_package with forked callers of this
+# and reassemble at the end
+
+def main(args):
+    parser = OptionParser()
+    parser.add_option('--tmpmdpath', default=None, 
+                help="path where the outputs should be dumped for this worker")
+    parser.add_option("--pkgoptions", default=[], action='append',
+                help="pkgoptions in the format of key=value")
+
+    
+    opts, pkgs = parser.parse_args(args)
+    external_data = {'_packagenumber': 0}
+    if not opts.tmpmdpath:
+        print >> sys.stderr, "tmpmdpath required for destination files"
+        sys.exit(1)
+    
+    for strs in opts.pkgoptions:
+        k,v = strs.split('=')
+        if v in ['True', 'true', 'yes', '1', 1]:
+            v = True
+        elif v in ['False', 'false', 'no', '0', 0]:
+            v = False
+        elif v in ['None', 'none', '']:
+            v = None
+        external_data[k] = v
+
+    
+    reldir = external_data['_reldir']
+    ts = rpmUtils.transaction.initReadOnlyTransaction()
+    pri = open(opts.tmpmdpath + '/primary.xml' , 'w')
+    fl = open(opts.tmpmdpath  + '/filelists.xml' , 'w')
+    other = open(opts.tmpmdpath  + '/other.xml' , 'w')
+    
+    for pkgfile in pkgs:
+        pkgpath = reldir + '/' + pkgfile
+        if not os.path.exists(pkgpath):
+            continue
+
+        try:
+            pkg = createrepo.yumbased.CreateRepoPackage(ts, package=pkgpath, 
+                                                        external_data=external_data)
+            pri.write(pkg.xml_dump_primary_metadata())
+            fl.write(pkg.xml_dump_filelists_metadata())
+            other.write(pkg.xml_dump_other_metadata())
+        except yum.Errors.YumBaseError, e:
+            print >> sys.stderr, "Error: %s" % e
+            continue
+        else:
+            external_data['_packagenumber']+=1
+        
+    pri.close()
+    fl.close()
+    other.close()
+    print external_data['_packagenumber']
+    
+if __name__ == "__main__":
+    main(sys.argv[1:])