group: Implemented parallel package aggregating
authorEd Bartosh <eduard.bartosh@intel.com>
Thu, 12 Jun 2014 11:26:34 +0000 (14:26 +0300)
committerEd Bartosh <eduard.bartosh@intel.com>
Thu, 12 Jun 2014 11:26:34 +0000 (14:26 +0300)
This change should speed up group creation. Currently it's too slow,
especially for submissions with a lot of rebuilt packages.

Fixes: #1955
Change-Id: Id271e1bb30897bbb3807ac4221acef32d4d3eed1
Signed-off-by: Ed Bartosh <eduard.bartosh@intel.com>
repa/group.py
repa/obs.py

index 2778491075948670fd7e4ff0a82ae7e4395a8e38..bc61f4c15becb6b6d9bb650accbf134e1f13700f 100755 (executable)
@@ -35,6 +35,8 @@ import json
 
 from collections import defaultdict
 from StringIO import StringIO
+from multiprocessing.pool import ThreadPool
+from functools import partial
 
 from repa.main import sub_main
 from repa.obs import OBS
@@ -117,24 +119,41 @@ def create_group_project(obs, submissions, meta, comment):
     return name, str(project)
 
 
-def aggregate(obs, bresults, gproject):
+def aggregate(obs, bresults, gproject, processes):
     """Aggregate packages into group project."""
+    def notify(out, submission, result):
+        """Callback, called by apply_async."""
+        pkg  = result[1]
+        out.write('aggregated: %s/%s\n' % (submission, pkg))
     aggregated = set()
     obs.set_global_flag('publish', 'disable', gproject)
+    if processes > 1:
+        pool = ThreadPool(processes=processes)
     for subm, prj, results in bresults:
         for res in results.itervalues():
             for pkg, state in res['packages']:
                 if state == 'succeeded' and pkg not in aggregated:
-                    print 'aggregating %s/%s' % (subm, pkg)
-                    obs.aggregate_package(prj, pkg, gproject, pkg)
+                    if processes > 1:
+                        callback = partial(notify, sys.stdout, subm)
+                        pool.apply_async(obs.aggregate_package,
+                                         [prj, pkg, gproject, pkg],
+                                         callback=callback)
+                    else:
+                        obs.aggregate_package(prj, pkg, gproject, pkg)
+                        print 'aggregated %s/%s' % (subm, pkg)
                     aggregated.add(pkg)
 
+    if processes > 1:
+        pool.close()
+        pool.join()
+
     obs.set_global_flag('publish', 'enable', gproject)
 
     return aggregated
 
 
-def group_submissions(obs, submissions, target, comment, force=False):
+def group_submissions(obs, submissions, target, comment,
+                      force=False, processes=0):
     """Group multiple submissions into one group."""
     # find correspondent prerelease projects
     info = {}
@@ -162,7 +181,7 @@ def group_submissions(obs, submissions, target, comment, force=False):
                                           comment)
     print 'Created submit group %s\n' % name
 
-    aggregated = aggregate(obs, bresults, gproject)
+    aggregated = aggregate(obs, bresults, gproject, processes)
 
     print '\n%d submissions (%d packages) have been merged into %s' % \
           (len(info), len(aggregated), name)
@@ -175,10 +194,13 @@ class Group(object):
     help = description
 
     @staticmethod
-    def add_arguments(parser, _config):
+    def add_arguments(parser, config):
         """Adds arguments to the parser. Called from [sub_]main."""
         parser.add_argument('submission', nargs='+',
                            help='space separated list of submissions')
+        parser.add_argument('--processes', type=int,
+                            help='amount of parallel processes to use',
+                            default=config.get('processes'))
         parser.add_argument('-c', '--comment', help='comment', default='')
         parser.add_argument('-f', '--force', action='store_true',
                             help='force group creation')
@@ -188,7 +210,7 @@ class Group(object):
         """Command line entry point. Called from [sub_]main."""
         obs = OBS(argv.apiurl, argv.apiuser, argv.apipasswd)
         return group_submissions(obs, argv.submission, argv.project,
-                                 argv.comment, argv.force)
+                                 argv.comment, argv.force, argv.processes)
 
 
 if __name__ == '__main__':
index 3cf766386192e0e443770681f185d5a0d76a71b3..de055a1fc6be74ca007324259e8f8d0beb906a51 100644 (file)
@@ -155,6 +155,7 @@ class OBS(OSC):
                                                               repo.arch)
 
     @staticmethod
+    @retry((OSCError, HTTPError))
     def aggregate_package(src_project, src_package, dst_project,
                           dst_package):
         """Aggregate package. Wraps core.aggregate_pack."""
@@ -166,6 +167,7 @@ class OBS(OSC):
         finally:
             sys.stdout = saved
 
+        return src_project, src_package, dst_project, dst_package
 
     def create_sr(self, src_project, packages, tgt_project, message=''):
         """Create submit request for the project."""