diff options
Diffstat (limited to 'tests/common.py')
-rw-r--r-- | tests/common.py | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/tests/common.py b/tests/common.py new file mode 100644 index 0000000..f0fee8b --- /dev/null +++ b/tests/common.py @@ -0,0 +1,206 @@ +import unittest +import osc.core +import shutil +import tempfile +import os +import sys +from xml.etree import cElementTree as ET +EXPECTED_REQUESTS = [] + +if sys.version_info[0:2] in ((2, 6), (2, 7)): + bytes = lambda x, *args: x + +try: + #python 2.x + from cStringIO import StringIO + from urllib2 import HTTPHandler, addinfourl, build_opener + from urlparse import urlparse, parse_qs +except ImportError: + from io import StringIO + from urllib.request import HTTPHandler, addinfourl, build_opener + from urllib.parse import urlparse, parse_qs + +def urlcompare(url, *args): + """compare all components of url except query string - it is converted to + dict, therefor different ordering does not makes url's different, as well + as quoting of a query string""" + + components = urlparse(url) + query_args = parse_qs(components.query) + components = components._replace(query=None) + + if not args: + return False + + for url in args: + components2 = urlparse(url) + query_args2 = parse_qs(components2.query) + components2 = components2._replace(query=None) + + if components != components2 or \ + query_args != query_args2: + return False + + return True + +class RequestWrongOrder(Exception): + """raised if an unexpected request is issued to urllib2""" + def __init__(self, url, exp_url, method, exp_method): + Exception.__init__(self) + self.url = url + self.exp_url = exp_url + self.method = method + self.exp_method = exp_method + + def __str__(self): + return '%s, %s, %s, %s' % (self.url, self.exp_url, self.method, self.exp_method) + +class RequestDataMismatch(Exception): + """raised if POSTed or PUTed data doesn't match with the expected data""" + def __init__(self, url, got, exp): + self.url = url + self.got = got + self.exp = exp + + def __str__(self): + return '%s, %s, %s' % (self.url, self.got, self.exp) + +class MyHTTPHandler(HTTPHandler): + def __init__(self, exp_requests, fixtures_dir): + HTTPHandler.__init__(self) + self.__exp_requests = exp_requests + self.__fixtures_dir = fixtures_dir + + def http_open(self, req): + r = self.__exp_requests.pop(0) + if not urlcompare(req.get_full_url(), r[1]) or req.get_method() != r[0]: + raise RequestWrongOrder(req.get_full_url(), r[1], req.get_method(), r[0]) + if req.get_method() in ('GET', 'DELETE'): + return self.__mock_GET(r[1], **r[2]) + elif req.get_method() in ('PUT', 'POST'): + return self.__mock_PUT(req, **r[2]) + + def __mock_GET(self, fullurl, **kwargs): + return self.__get_response(fullurl, **kwargs) + + def __mock_PUT(self, req, **kwargs): + exp = kwargs.get('exp', None) + if exp is not None and 'expfile' in kwargs: + raise RuntimeError('either specify exp or expfile') + elif 'expfile' in kwargs: + exp = open(os.path.join(self.__fixtures_dir, kwargs['expfile']), 'r').read() + elif exp is None: + raise RuntimeError('exp or expfile required') + if exp is not None: + if req.get_data() != bytes(exp, "utf-8"): + raise RequestDataMismatch(req.get_full_url(), repr(req.get_data()), repr(exp)) + return self.__get_response(req.get_full_url(), **kwargs) + + def __get_response(self, url, **kwargs): + f = None + if 'exception' in kwargs: + raise kwargs['exception'] + if 'text' not in kwargs and 'file' in kwargs: + f = StringIO(open(os.path.join(self.__fixtures_dir, kwargs['file']), 'r').read()) + elif 'text' in kwargs and 'file' not in kwargs: + f = StringIO(kwargs['text']) + else: + raise RuntimeError('either specify text or file') + resp = addinfourl(f, {}, url) + resp.code = kwargs.get('code', 200) + resp.msg = '' + return resp + +def urldecorator(method, fullurl, **kwargs): + def decorate(test_method): + def wrapped_test_method(*args): + addExpectedRequest(method, fullurl, **kwargs) + test_method(*args) + # "rename" method otherwise we cannot specify a TestCaseClass.testName + # cmdline arg when using unittest.main() + wrapped_test_method.__name__ = test_method.__name__ + return wrapped_test_method + return decorate + +def GET(fullurl, **kwargs): + return urldecorator('GET', fullurl, **kwargs) + +def PUT(fullurl, **kwargs): + return urldecorator('PUT', fullurl, **kwargs) + +def POST(fullurl, **kwargs): + return urldecorator('POST', fullurl, **kwargs) + +def DELETE(fullurl, **kwargs): + return urldecorator('DELETE', fullurl, **kwargs) + +def addExpectedRequest(method, url, **kwargs): + global EXPECTED_REQUESTS + EXPECTED_REQUESTS.append((method, url, kwargs)) + +class OscTestCase(unittest.TestCase): + def setUp(self, copytree=True): + oscrc = os.path.join(self._get_fixtures_dir(), 'oscrc') + osc.core.conf.get_config(override_conffile=oscrc, + override_no_keyring=True, override_no_gnome_keyring=True) + os.environ['OSC_CONFIG'] = oscrc + + self.tmpdir = tempfile.mkdtemp(prefix='osc_test') + if copytree: + shutil.copytree(os.path.join(self._get_fixtures_dir(), 'osctest'), os.path.join(self.tmpdir, 'osctest')) + global EXPECTED_REQUESTS + EXPECTED_REQUESTS = [] + osc.core.conf._build_opener = lambda u: build_opener(MyHTTPHandler(EXPECTED_REQUESTS, self._get_fixtures_dir())) + self.stdout = sys.stdout + sys.stdout = StringIO() + + def tearDown(self): + self.assertTrue(len(EXPECTED_REQUESTS) == 0) + sys.stdout = self.stdout + try: + shutil.rmtree(self.tmpdir) + except: + pass + + def _get_fixtures_dir(self): + raise NotImplementedError('subclasses should implement this method') + + def _change_to_pkg(self, name): + os.chdir(os.path.join(self.tmpdir, 'osctest', name)) + + def _check_list(self, fname, exp): + fname = os.path.join('.osc', fname) + self.assertTrue(os.path.exists(fname)) + self.assertEqual(open(fname, 'r').read(), exp) + + def _check_addlist(self, exp): + self._check_list('_to_be_added', exp) + + def _check_deletelist(self, exp): + self._check_list('_to_be_deleted', exp) + + def _check_conflictlist(self, exp): + self._check_list('_in_conflict', exp) + + def _check_status(self, p, fname, exp): + self.assertEqual(p.status(fname), exp) + + def _check_digests(self, fname, *skipfiles): + fname = os.path.join(self._get_fixtures_dir(), fname) + self.assertEqual(open(os.path.join('.osc', '_files'), 'r').read(), open(fname, 'r').read()) + root = ET.parse(fname).getroot() + for i in root.findall('entry'): + if i.get('name') in skipfiles: + continue + self.assertTrue(os.path.exists(os.path.join('.osc', i.get('name')))) + self.assertEqual(osc.core.dgst(os.path.join('.osc', i.get('name'))), i.get('md5')) + + def assertEqualMultiline(self, got, exp): + if (got + exp).find('\n') == -1: + self.assertEqual(got, exp) + else: + start_delim = "\n" + (" 8< ".join(["-----"] * 8)) + "\n" + end_delim = "\n" + (" >8 ".join(["-----"] * 8)) + "\n\n" + self.assertEqual(got, exp, + "got:" + start_delim + got + end_delim + + "expected:" + start_delim + exp + end_delim) |