From e4c674f2851f2ca71f744e86972e1c3acaf1e8f2 Mon Sep 17 00:00:00 2001 From: Seth Vidal Date: Fri, 10 Sep 2010 16:06:17 -0400 Subject: [PATCH] - make sure we handle remote_url pkgs correctly until we get the worker hooked up to handle them - if there are no pkgs to handle, don't launch workers with nothing to do. - give better output from the workers and have them obey -v/-q - everyone loves callbacks! --- createrepo/__init__.py | 141 +++++++++++++++++++++++++---------------- worker.py | 29 ++++++++- 2 files changed, 111 insertions(+), 59 deletions(-) diff --git a/createrepo/__init__.py b/createrepo/__init__.py index b2937c9..a2ae8b0 100644 --- a/createrepo/__init__.py +++ b/createrepo/__init__.py @@ -580,77 +580,106 @@ class MetaDataGenerator: # filter out those pkgs which are not files - but are pkgobjects pkgfiles = [] for pkg in newpkgs: + po = None if isinstance(pkg, YumAvailablePackage): po = pkg self.read_pkgs.append(po.localpath) + + # if we're dealing with remote pkgs - pitch it over to doing + # them one at a time, for now. + elif pkg.find('://') != -1: + po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) + self.read_pkgs.append(pkg) + + if po: self.primaryfile.write(po.xml_dump_primary_metadata()) self.flfile.write(po.xml_dump_filelists_metadata()) self.otherfile.write(po.xml_dump_other_metadata( clog_limit=self.conf.changelog_limit)) continue - pkgfiles.append(pkg) - # divide that list by the number of workers and fork off that many - - # workers to tmpdirs - # waitfor the workers to finish and as each one comes in - # open the files they created and write them out to our metadata - # add up the total pkg counts and return that value - worker_tmp_path = tempfile.mkdtemp() - worker_chunks = utils.split_list_into_equal_chunks(pkgfiles, self.conf.workers) - worker_cmd_dict = {} - worker_jobs = [] - base_worker_cmdline = [self.conf.worker_cmd, - '--pkgoptions=_reldir=%s' % reldir, - '--pkgoptions=_collapse_libc_requires=%s' % self.conf.collapse_glibc_requires, - '--pkgoptions=_cachedir=%s' % self.conf.cachedir, - '--pkgoptions=_baseurl=%s' % self.conf.baseurl,] - - - for worker_num in range(self.conf.workers): - # make the worker directory - workercmdline = [] - workercmdline.extend(base_worker_cmdline) - thisdir = worker_tmp_path + '/' + str(worker_num) - if checkAndMakeDir(thisdir): - workercmdline.append('--tmpmdpath=%s' % thisdir) - else: - raise MDError, "Unable to create worker path: %s" % thisdir - workercmdline.extend(worker_chunks[worker_num]) - worker_cmd_dict[worker_num] = workercmdline - + if pkgfiles: + # divide that list by the number of workers and fork off that many + # workers to tmpdirs + # waitfor the workers to finish and as each one comes in + # open the files they created and write them out to our metadata + # add up the total pkg counts and return that value + worker_tmp_path = tempfile.mkdtemp() + worker_chunks = utils.split_list_into_equal_chunks(pkgfiles, self.conf.workers) + worker_cmd_dict = {} + worker_jobs = {} + base_worker_cmdline = [self.conf.worker_cmd, + '--pkgoptions=_reldir=%s' % reldir, + '--pkgoptions=_collapse_libc_requires=%s' % self.conf.collapse_glibc_requires, + '--pkgoptions=_cachedir=%s' % self.conf.cachedir, + '--pkgoptions=_baseurl=%s' % self.conf.baseurl, + '--globalopts=clog_limit=%s' % self.conf.changelog_limit,] + if self.conf.quiet: + base_worker_cmdline.append('--quiet') + + if self.conf.verbose: + base_worker_cmdline.append('--verbose') + + for worker_num in range(self.conf.workers): + # make the worker directory + workercmdline = [] + workercmdline.extend(base_worker_cmdline) + thisdir = worker_tmp_path + '/' + str(worker_num) + if checkAndMakeDir(thisdir): + workercmdline.append('--tmpmdpath=%s' % thisdir) + else: + raise MDError, "Unable to create worker path: %s" % thisdir + workercmdline.extend(worker_chunks[worker_num]) + worker_cmd_dict[worker_num] = workercmdline + + - for (num, cmdline) in worker_cmd_dict.items(): - self.callback.log("Spawning worker %s with %s pkgs" % (num, len(worker_chunks[num]))) - job = subprocess.Popen(cmdline) # fixme - add stdout/stderr PIPES here, - worker_jobs.append(job) - - for job in worker_jobs: - # fixme - need 'communicate' to see about getting info back - os.waitpid(job.pid, 0) - - self.callback.log("Workers Finished") - # finished with workers - # go to their dirs and add the contents - self.callback.log("Gathering worker results") - for num in range(self.conf.workers): - for (fn, fo) in (('primary.xml', self.primaryfile), - ('filelists.xml', self.flfile), - ('other.xml', self.otherfile)): - fnpath = worker_tmp_path + '/' + str(num) + '/' + fn - if os.path.exists(fnpath): - fo.write(open(fnpath, 'r').read()) - + for (num, cmdline) in worker_cmd_dict.items(): + if not self.conf.quiet: + self.callback.log("Spawning worker %s with %s pkgs" % (num, + len(worker_chunks[num]))) + job = subprocess.Popen(cmdline, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + worker_jobs[num] = job + + gimmebreak = 0 + while gimmebreak != len(worker_jobs.keys()): + gimmebreak = 0 + for (num,job) in worker_jobs.items(): + if job.poll() is not None: + gimmebreak+=1 + line = job.stdout.readline() + if line: + self.callback.log('Worker %s: %s' % (num, line.rstrip())) + line = job.stderr.readline() + if line: + self.callback.errorlog('Worker %s: %s' % (num, line.rstrip())) + - for pkgfile in pkgfiles: - if self.conf.deltas: - po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) - self._do_delta_rpm_package(po) - self.read_pkgs.append(pkgfile) + if not self.conf.quiet: + self.callback.log("Workers Finished") + # finished with workers + # go to their dirs and add the contents + if not self.conf.quiet: + self.callback.log("Gathering worker results") + for num in range(self.conf.workers): + for (fn, fo) in (('primary.xml', self.primaryfile), + ('filelists.xml', self.flfile), + ('other.xml', self.otherfile)): + fnpath = worker_tmp_path + '/' + str(num) + '/' + fn + if os.path.exists(fnpath): + fo.write(open(fnpath, 'r').read()) + + + for pkgfile in pkgfiles: + if self.conf.deltas: + po = self.read_in_package(pkgfile, pkgpath=pkgpath, reldir=reldir) + self._do_delta_rpm_package(po) + self.read_pkgs.append(pkgfile) return self.current_pkg diff --git a/worker.py b/worker.py index 8c7e4c5..eb35ef7 100755 --- a/worker.py +++ b/worker.py @@ -25,14 +25,22 @@ def main(args): help="path where the outputs should be dumped for this worker") parser.add_option("--pkgoptions", default=[], action='append', help="pkgoptions in the format of key=value") + parser.add_option("--quiet", default=False, action='store_true', + help="only output errors and a total") + parser.add_option("--verbose", default=False, action='store_true', + help="output errors and a total") + parser.add_option("--globalopts", default=[], action='append', + help="general options in the format of key=value") opts, pkgs = parser.parse_args(args) - external_data = {'_packagenumber': 0} + external_data = {'_packagenumber': 1} + globalopts = {} if not opts.tmpmdpath: print >> sys.stderr, "tmpmdpath required for destination files" sys.exit(1) + for strs in opts.pkgoptions: k,v = strs.split('=') if v in ['True', 'true', 'yes', '1', 1]: @@ -43,6 +51,16 @@ def main(args): v = None external_data[k] = v + for strs in opts.globalopts: + k,v = strs.split('=') + if v in ['True', 'true', 'yes', '1', 1]: + v = True + elif v in ['False', 'false', 'no', '0', 0]: + v = False + elif v in ['None', 'none', '']: + v = None + globalopts[k] = v + reldir = external_data['_reldir'] ts = rpmUtils.transaction.initReadOnlyTransaction() @@ -50,17 +68,23 @@ def main(args): fl = open(opts.tmpmdpath + '/filelists.xml' , 'w') other = open(opts.tmpmdpath + '/other.xml' , 'w') + for pkgfile in pkgs: pkgpath = reldir + '/' + pkgfile if not os.path.exists(pkgpath): + print >> sys.stderr, "File not found: %s" % pkgpath continue try: + if not opts.quiet and opts.verbose: + print "reading %s" % (pkgfile) + pkg = createrepo.yumbased.CreateRepoPackage(ts, package=pkgpath, external_data=external_data) pri.write(pkg.xml_dump_primary_metadata()) fl.write(pkg.xml_dump_filelists_metadata()) - other.write(pkg.xml_dump_other_metadata()) + other.write(pkg.xml_dump_other_metadata(clog_limit= + globalopts.get('clog_limit', None))) except yum.Errors.YumBaseError, e: print >> sys.stderr, "Error: %s" % e continue @@ -70,7 +94,6 @@ def main(args): pri.close() fl.close() other.close() - print external_data['_packagenumber'] if __name__ == "__main__": main(sys.argv[1:]) -- 2.34.1