from collections import defaultdict
from StringIO import StringIO
+from multiprocessing.pool import ThreadPool
+from functools import partial
from repa.main import sub_main
from repa.obs import OBS
return name, str(project)
-def aggregate(obs, bresults, gproject):
+def aggregate(obs, bresults, gproject, processes):
"""Aggregate packages into group project."""
+ def notify(out, submission, result):
+ """Callback, called by apply_async."""
+ pkg = result[1]
+ out.write('aggregated: %s/%s\n' % (submission, pkg))
aggregated = set()
obs.set_global_flag('publish', 'disable', gproject)
+ if processes > 1:
+ pool = ThreadPool(processes=processes)
for subm, prj, results in bresults:
for res in results.itervalues():
for pkg, state in res['packages']:
if state == 'succeeded' and pkg not in aggregated:
- print 'aggregating %s/%s' % (subm, pkg)
- obs.aggregate_package(prj, pkg, gproject, pkg)
+ if processes > 1:
+ callback = partial(notify, sys.stdout, subm)
+ pool.apply_async(obs.aggregate_package,
+ [prj, pkg, gproject, pkg],
+ callback=callback)
+ else:
+ obs.aggregate_package(prj, pkg, gproject, pkg)
+ print 'aggregated %s/%s' % (subm, pkg)
aggregated.add(pkg)
+ if processes > 1:
+ pool.close()
+ pool.join()
+
obs.set_global_flag('publish', 'enable', gproject)
return aggregated
-def group_submissions(obs, submissions, target, comment, force=False):
+def group_submissions(obs, submissions, target, comment,
+ force=False, processes=0):
"""Group multiple submissions into one group."""
# find correspondent prerelease projects
info = {}
comment)
print 'Created submit group %s\n' % name
- aggregated = aggregate(obs, bresults, gproject)
+ aggregated = aggregate(obs, bresults, gproject, processes)
print '\n%d submissions (%d packages) have been merged into %s' % \
(len(info), len(aggregated), name)
help = description
@staticmethod
- def add_arguments(parser, _config):
+ def add_arguments(parser, config):
"""Adds arguments to the parser. Called from [sub_]main."""
parser.add_argument('submission', nargs='+',
help='space separated list of submissions')
+ parser.add_argument('--processes', type=int,
+ help='amount of parallel processes to use',
+ default=config.get('processes'))
parser.add_argument('-c', '--comment', help='comment', default='')
parser.add_argument('-f', '--force', action='store_true',
help='force group creation')
"""Command line entry point. Called from [sub_]main."""
obs = OBS(argv.apiurl, argv.apiuser, argv.apipasswd)
return group_submissions(obs, argv.submission, argv.project,
- argv.comment, argv.force)
+ argv.comment, argv.force, argv.processes)
if __name__ == '__main__':