diff options
author | Ed Bartosh <eduard.bartosh@intel.com> | 2014-06-12 14:26:34 +0300 |
---|---|---|
committer | Ed Bartosh <eduard.bartosh@intel.com> | 2014-06-12 14:26:34 +0300 |
commit | 352a3a0a298102c8566afdc6f028801b620f6362 (patch) | |
tree | 018e81117d5830da2917211620e2a4ec99364ac8 | |
parent | bb846affc10167c2af2a34eddf7e00e0761fc91b (diff) | |
download | repa-352a3a0a298102c8566afdc6f028801b620f6362.tar.gz repa-352a3a0a298102c8566afdc6f028801b620f6362.tar.bz2 repa-352a3a0a298102c8566afdc6f028801b620f6362.zip |
group: Implemented parallel package aggregating
This change should speed up group creation. Currently it's too slow,
especially for submissions with a lot of rebuilt packages.
Fixes: #1955
Change-Id: Id271e1bb30897bbb3807ac4221acef32d4d3eed1
Signed-off-by: Ed Bartosh <eduard.bartosh@intel.com>
-rwxr-xr-x | repa/group.py | 36 | ||||
-rw-r--r-- | repa/obs.py | 2 |
2 files changed, 31 insertions, 7 deletions
diff --git a/repa/group.py b/repa/group.py index 2778491..bc61f4c 100755 --- a/repa/group.py +++ b/repa/group.py @@ -35,6 +35,8 @@ import json from collections import defaultdict from StringIO import StringIO +from multiprocessing.pool import ThreadPool +from functools import partial from repa.main import sub_main from repa.obs import OBS @@ -117,24 +119,41 @@ def create_group_project(obs, submissions, meta, comment): return name, str(project) -def aggregate(obs, bresults, gproject): +def aggregate(obs, bresults, gproject, processes): """Aggregate packages into group project.""" + def notify(out, submission, result): + """Callback, called by apply_async.""" + pkg = result[1] + out.write('aggregated: %s/%s\n' % (submission, pkg)) aggregated = set() obs.set_global_flag('publish', 'disable', gproject) + if processes > 1: + pool = ThreadPool(processes=processes) for subm, prj, results in bresults: for res in results.itervalues(): for pkg, state in res['packages']: if state == 'succeeded' and pkg not in aggregated: - print 'aggregating %s/%s' % (subm, pkg) - obs.aggregate_package(prj, pkg, gproject, pkg) + if processes > 1: + callback = partial(notify, sys.stdout, subm) + pool.apply_async(obs.aggregate_package, + [prj, pkg, gproject, pkg], + callback=callback) + else: + obs.aggregate_package(prj, pkg, gproject, pkg) + print 'aggregated %s/%s' % (subm, pkg) aggregated.add(pkg) + if processes > 1: + pool.close() + pool.join() + obs.set_global_flag('publish', 'enable', gproject) return aggregated -def group_submissions(obs, submissions, target, comment, force=False): +def group_submissions(obs, submissions, target, comment, + force=False, processes=0): """Group multiple submissions into one group.""" # find correspondent prerelease projects info = {} @@ -162,7 +181,7 @@ def group_submissions(obs, submissions, target, comment, force=False): comment) print 'Created submit group %s\n' % name - aggregated = aggregate(obs, bresults, gproject) + aggregated = aggregate(obs, bresults, gproject, processes) print '\n%d submissions (%d packages) have been merged into %s' % \ (len(info), len(aggregated), name) @@ -175,10 +194,13 @@ class Group(object): help = description @staticmethod - def add_arguments(parser, _config): + def add_arguments(parser, config): """Adds arguments to the parser. Called from [sub_]main.""" parser.add_argument('submission', nargs='+', help='space separated list of submissions') + parser.add_argument('--processes', type=int, + help='amount of parallel processes to use', + default=config.get('processes')) parser.add_argument('-c', '--comment', help='comment', default='') parser.add_argument('-f', '--force', action='store_true', help='force group creation') @@ -188,7 +210,7 @@ class Group(object): """Command line entry point. Called from [sub_]main.""" obs = OBS(argv.apiurl, argv.apiuser, argv.apipasswd) return group_submissions(obs, argv.submission, argv.project, - argv.comment, argv.force) + argv.comment, argv.force, argv.processes) if __name__ == '__main__': diff --git a/repa/obs.py b/repa/obs.py index 3cf7663..de055a1 100644 --- a/repa/obs.py +++ b/repa/obs.py @@ -155,6 +155,7 @@ class OBS(OSC): repo.arch) @staticmethod + @retry((OSCError, HTTPError)) def aggregate_package(src_project, src_package, dst_project, dst_package): """Aggregate package. Wraps core.aggregate_pack.""" @@ -166,6 +167,7 @@ class OBS(OSC): finally: sys.stdout = saved + return src_project, src_package, dst_project, dst_package def create_sr(self, src_project, packages, tgt_project, message=''): """Create submit request for the project.""" |