summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEd Bartosh <eduard.bartosh@intel.com>2014-06-12 14:26:34 +0300
committerEd Bartosh <eduard.bartosh@intel.com>2014-06-12 14:26:34 +0300
commit352a3a0a298102c8566afdc6f028801b620f6362 (patch)
tree018e81117d5830da2917211620e2a4ec99364ac8
parentbb846affc10167c2af2a34eddf7e00e0761fc91b (diff)
downloadrepa-352a3a0a298102c8566afdc6f028801b620f6362.tar.gz
repa-352a3a0a298102c8566afdc6f028801b620f6362.tar.bz2
repa-352a3a0a298102c8566afdc6f028801b620f6362.zip
group: Implemented parallel package aggregating
This change should speed up group creation. Currently it's too slow, especially for submissions with a lot of rebuilt packages. Fixes: #1955 Change-Id: Id271e1bb30897bbb3807ac4221acef32d4d3eed1 Signed-off-by: Ed Bartosh <eduard.bartosh@intel.com>
-rwxr-xr-xrepa/group.py36
-rw-r--r--repa/obs.py2
2 files changed, 31 insertions, 7 deletions
diff --git a/repa/group.py b/repa/group.py
index 2778491..bc61f4c 100755
--- a/repa/group.py
+++ b/repa/group.py
@@ -35,6 +35,8 @@ import json
from collections import defaultdict
from StringIO import StringIO
+from multiprocessing.pool import ThreadPool
+from functools import partial
from repa.main import sub_main
from repa.obs import OBS
@@ -117,24 +119,41 @@ def create_group_project(obs, submissions, meta, comment):
return name, str(project)
-def aggregate(obs, bresults, gproject):
+def aggregate(obs, bresults, gproject, processes):
"""Aggregate packages into group project."""
+ def notify(out, submission, result):
+ """Callback, called by apply_async."""
+ pkg = result[1]
+ out.write('aggregated: %s/%s\n' % (submission, pkg))
aggregated = set()
obs.set_global_flag('publish', 'disable', gproject)
+ if processes > 1:
+ pool = ThreadPool(processes=processes)
for subm, prj, results in bresults:
for res in results.itervalues():
for pkg, state in res['packages']:
if state == 'succeeded' and pkg not in aggregated:
- print 'aggregating %s/%s' % (subm, pkg)
- obs.aggregate_package(prj, pkg, gproject, pkg)
+ if processes > 1:
+ callback = partial(notify, sys.stdout, subm)
+ pool.apply_async(obs.aggregate_package,
+ [prj, pkg, gproject, pkg],
+ callback=callback)
+ else:
+ obs.aggregate_package(prj, pkg, gproject, pkg)
+ print 'aggregated %s/%s' % (subm, pkg)
aggregated.add(pkg)
+ if processes > 1:
+ pool.close()
+ pool.join()
+
obs.set_global_flag('publish', 'enable', gproject)
return aggregated
-def group_submissions(obs, submissions, target, comment, force=False):
+def group_submissions(obs, submissions, target, comment,
+ force=False, processes=0):
"""Group multiple submissions into one group."""
# find correspondent prerelease projects
info = {}
@@ -162,7 +181,7 @@ def group_submissions(obs, submissions, target, comment, force=False):
comment)
print 'Created submit group %s\n' % name
- aggregated = aggregate(obs, bresults, gproject)
+ aggregated = aggregate(obs, bresults, gproject, processes)
print '\n%d submissions (%d packages) have been merged into %s' % \
(len(info), len(aggregated), name)
@@ -175,10 +194,13 @@ class Group(object):
help = description
@staticmethod
- def add_arguments(parser, _config):
+ def add_arguments(parser, config):
"""Adds arguments to the parser. Called from [sub_]main."""
parser.add_argument('submission', nargs='+',
help='space separated list of submissions')
+ parser.add_argument('--processes', type=int,
+ help='amount of parallel processes to use',
+ default=config.get('processes'))
parser.add_argument('-c', '--comment', help='comment', default='')
parser.add_argument('-f', '--force', action='store_true',
help='force group creation')
@@ -188,7 +210,7 @@ class Group(object):
"""Command line entry point. Called from [sub_]main."""
obs = OBS(argv.apiurl, argv.apiuser, argv.apipasswd)
return group_submissions(obs, argv.submission, argv.project,
- argv.comment, argv.force)
+ argv.comment, argv.force, argv.processes)
if __name__ == '__main__':
diff --git a/repa/obs.py b/repa/obs.py
index 3cf7663..de055a1 100644
--- a/repa/obs.py
+++ b/repa/obs.py
@@ -155,6 +155,7 @@ class OBS(OSC):
repo.arch)
@staticmethod
+ @retry((OSCError, HTTPError))
def aggregate_package(src_project, src_package, dst_project,
dst_package):
"""Aggregate package. Wraps core.aggregate_pack."""
@@ -166,6 +167,7 @@ class OBS(OSC):
finally:
sys.stdout = saved
+ return src_project, src_package, dst_project, dst_package
def create_sr(self, src_project, packages, tgt_project, message=''):
"""Create submit request for the project."""