#!/usr/bin/env @PYTHON@ import argparse import errno import locale import logging import multiprocessing import os import re import shutil import signal import subprocess import sys import textwrap as _textwrap from dataclasses import dataclass from datetime import datetime from functools import cmp_to_key from pathlib import Path # Will be replaced by the real path at the generation # stage. Can be relative or absolute. TOP_SRCDIR = '@top_srcdir@' # This contains current root of the build directory # and will be populated at the generation stage. TOP_BUILDIR = '@top_builddir@' # Prepare static patterns once at start. RE2C_PATTERN = re.compile(r'.*re2c[ \t]*(.*)$') TYPE_HEADER_PATTERN1 = re.compile(r'--header[ \t]*') TYPE_HEADER_PATTERN2 = re.compile(r'.*--header[ \t]*([^ \t]*).*') FILEPATH = b'^[a-z0-9_]+([\\\\/]+[a-z0-9_]+)+([.][a-z]+)+' START_OF_LINEDIR = b'^(//|#)line ' FILEPATH_CONTEXT_PATTERN = re.compile(b'(' + START_OF_LINEDIR + b'|' + FILEPATH + b')') @dataclass class Context: """Class for passing a context to parallel processes.""" # Programs re2c: str = '' diff: str = '' wine: str = '' valgrind: str = '' valgrind_opts: list = None diff_opts: list = None # Flags skeleton: bool = False keep_temp_files: bool = False verbose: bool = False # Include paths incpaths: list = None # Internal state base_path: str = '' reporter: logging.Logger = None class LineBreaksFormatter(argparse.HelpFormatter): """ ArgParse help formatter that allows line breaks in the usage messages and argument help strings. Normally to include newlines in the help output of argparse, you have use argparse.RawDescriptionHelpFormatter. However, this means raw text is enabled everywhere, and not just for specific help entries where we may need it. This help formatter allows for you to optional enable/toggle raw text on individual menu items by prefixing the help string with 'n|'.""" def _fill_text(self, text, width, indent) -> str: paragraphs = text.split('|n ') multiline_text = '' for paragraph in paragraphs: formatted_paragraph = _textwrap.fill( paragraph, width, initial_indent=indent, subsequent_indent=indent, ) + '\n' multiline_text = multiline_text + formatted_paragraph return multiline_text def parser_add_options(parser): """Add options group to a parser.""" ogroup = parser.add_argument_group('Available options') ogroup.add_argument( '--help', action='help', default=argparse.SUPPRESS, help='Print this help message and quit' ) ogroup.add_argument( '--valgrind', dest='valgrind', action='store_true', help=f'Run .{os.path.sep}re2c under valgrind' ) ogroup.add_argument( '--verbose', dest='verbose', action='store_true', help='Report successful test passes.' ) ogroup.add_argument( '--skeleton', dest='skeleton', action='store_true', help='Run skeleton validation for sources that support it' ) ogroup.add_argument( '--keep-temp-files', dest='keep_temp_files', action='store_true', help="Don't delete temporary files after test run" ) ogroup.add_argument( '--wine', dest='wine', action='store_true', help=f'Run .{os.path.sep}re2c.exe under wine' ) ogroup.add_argument( '-j', dest='jobs', default=multiprocessing.cpu_count(), type=int, action='store', nargs='?', help='Override CPU autodetection and use specified value' ) def parser_add_positionals(parser): """Add positional parameters group to a parser.""" pgroup = parser.add_argument_group('Positional parameters') pgroup.add_argument( 'tests', default=[], type=str, nargs='*', help='A space-separated list of tests' ) def parse_args(): """Initialize command line arguments parser and parse CLI arguments.""" parser = argparse.ArgumentParser( # The argparse module does not provide any option to add a "prolog". # When the help is displayed it always start with "usage:". # A dirty workaround that might work is to start the usage message # with a "\r". usage=('\rSynopsis: re2c test script\n\n' 'Usage: %(prog)s [options] [[--] tests]'), formatter_class=LineBreaksFormatter, add_help=False, # This is where the LineBreaksFormatter is used. epilog=('Usage examples:' '|n ' ' - run tests in parallel' '|n ' f' $ .{os.path.sep}%(prog)s' ), ) parser_add_options(parser) parser_add_positionals(parser) return parser.parse_args() def remove(*args, **kwargs): """ Delete a file which may not exist. If 'ignore_errors' is set to False, file-not-found errors are raised. """ ignore_errors = kwargs.get('ignore_errors', True) for filename in args: # Special case for empty filename if not filename and ignore_errors: continue try: os.remove(filename) except OSError as e: # Raise all errors which is not that the file was not found if e.errno != errno.ENOENT or not ignore_errors: raise def here(*paths): """Resolve relative path using the given path parts.""" return os.path.relpath(os.path.join(*paths)) def abshere(*paths): """Resolve absolute path using the given path parts.""" return os.path.abspath(here(*paths)) def patch_newlines_and_slashes(file_path, stem): """Convert CRLF to LF. Reverse slashes in path separators.""" crlf = b'\r\n' lf = b'\n' # Escape backslashes in stem before replacing, as the generated file # contains paths with escaped backslashes. stem = stem.replace('\\', '\\\\') stem1 = stem.encode('utf-8') stem2 = stem1.replace(b'\\\\', b'/') content = b'' with open(file_path, 'rb') as file: for line in file: # Replace backslashes in filenames with forward slashes. if FILEPATH_CONTEXT_PATTERN.match(line): line = line.replace(b'\\\\', b'/') elif stem1 != stem2: # Stems may be equal for tests in the toplevel directory # that don't have subdirectories in the file path. line = line.replace(stem1, stem2) # Replace CR LF with LF. line = line.replace(crlf, lf) content += line with open(file_path, 'wb') as file: file.write(content) def create_tests_tree(): """Create the test tree.""" test_blddir = here(datetime.now().strftime("test_%y%m%d%H%M%S")) shutil.rmtree(test_blddir, ignore_errors=True) os.makedirs(test_blddir) return test_blddir # We cannot use `shutil.copytree` because we need to copy multiple directories # into one, and that is problematic for two reasons: # 1) it requires `dirs_exist_ok` parameter which was added only in python-3.8 # 2) `make distcheck` fails when trying to copy the second directory because # it makes source files read-only def copy_dir(src, dst): """Copy the contents of directory `src` to directory `dst`, preserving directory structure.""" for path, dirs, files in os.walk(src, topdown=False): dstpath = os.path.join(dst, path[len(src)+1:]) if not os.path.isdir(dstpath): os.makedirs(dstpath) for f in files: shutil.copy(os.path.join(path, f), dstpath) def copy_tests(tests, dst): # type: (list, str) -> None """Copy the test structure to 'dst'.""" # preserve directory structure unless given explicit args if not tests: copy_dir(here(TOP_SRCDIR, 'test'), dst) copy_dir(here(TOP_SRCDIR, 'examples'), dst) return files = [] for f in tests: if not os.path.isfile(f): raise RuntimeError(f'Test file "{f}" not found') if f.endswith('.re'): file_mask = os.path.basename(f'{os.path.splitext(f)[0]}.*') files += [str(t) for t in Path('.').rglob(file_mask)] else: files.append(f) for f in files: shutil.copy(f, dst) def clean_test_tree(path): # type: (str) -> None """Clean the test tree from the not needed files.""" files = set([str(f) for f in Path(path).rglob('*.*')]) for ext in ['re', 'c', 'h', 'go', 'rs', 'inc']: files -= set([str(f) for f in Path(path).rglob(f'*.{ext}')]) remove(*[f for f in files if os.path.isfile(f)]) process = subprocess.Popen( list(filter(None, [_ctx.wine, _ctx.re2c, '--version'])), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out, err = process.communicate() out = out.decode('utf-8').rstrip() # If not a debug build, remove all debug subdirs if not re.search('debug', out): test_dirs = [str(d) for d in Path(path).rglob('debug')] filtered = [d for d in test_dirs if os.path.isdir(d)] for d in filtered: shutil.rmtree(d, ignore_errors=True) def filter_tests(path): # type: (str) -> list """Filter and sort tests in ascending order.""" test_files = [str(f) for f in Path(path).rglob('*.re')] return test_files def run_test(test): """Run a single test.""" # counters ran_tests = 0 hard_errors = 0 soft_errors = 0 # Set locale to use in sort functions below locale.setlocale(locale.LC_ALL, 'C') # remove prefix outx = test[len(os.path.basename(_ctx.base_path))+1:] os.chdir(_ctx.base_path) with open(outx, mode='rb') as f: head = f.readline().decode('utf-8').strip() # generate file extension (.c for C/C++, .go for Go, .rs for Rust) if head.startswith("// re2c"): ext = "c" elif head.startswith("//go:generate re2go"): ext = "go" elif head.startswith("// re2rust"): ext = "rs" else: ext = "??" stem = os.path.splitext(outx)[0] outy = f'{stem}.{ext}' # Prepare re2c flags switches = head switches = switches.replace('re2go', 're2c --lang go') switches = switches.replace('re2rust', 're2c --lang rust') switches = RE2C_PATTERN.sub(r'\1', switches) switches = switches.replace('$INPUT', outx) switches = switches.replace('$OUTPUT', outy) switches = switches.replace('$DEPFILE', f'{outy}.d') switches = TYPE_HEADER_PATTERN1.sub( # If outx contains Windows directory separator '\' # it will be interpreted as a regex escape command # and we'll get an error like # re.error: bad escape \c at position 16 # Therefore, we need to tilt the slashes in the opposite # direction :) f'--header {Path(outx).parent.as_posix()}/', switches ) # enable warnings globally switches = f'{switches} -W --no-version --no-generation-date' # normal tests if not _ctx.skeleton: # copy expected result orig = f'{stem}.orig.{ext}' try: shutil.move(outy, orig) except FileNotFoundError: # Do not fail immediately, run tests to the end and produce results. # This helps when adding new tests: one can add .re files, run the # tests, manually verify the generated results and copy them. pass # if options contain a header, create directory structure for it header = '' if '--header' in switches: header = TYPE_HEADER_PATTERN2.sub(r'\1', switches) if header: os.makedirs(os.path.dirname(header), exist_ok=True) # Disable Wine warnings, or else they are dumped into .stderr files for # random tests, so the test results don't match and the tests fail. if _ctx.wine: os.environ['WINEDEBUG'] = '-all' # run re2c args = [_ctx.valgrind] + _ctx.valgrind_opts args += [_ctx.wine, _ctx.re2c] + _ctx.incpaths + switches.split() with open(f'{outy}.stderr', mode='wb') as process_out: process = subprocess.Popen( list(filter(None, args)), stdout=process_out, stderr=process_out, ) # wait blocks until the process terminates process.wait() # paste all files created by re2c into output file # (this includes stderr and optionally header and skeleton files) files = [header] if header else [] files += [str(f) for f in Path('.').glob(f'{outy}.*')] # An ugly hack to put '*.stderr' files at the end of list # but keep the list sorted using 'locale.strcoll' collate files = sorted(files, key=cmp_to_key(locale.strcoll)) f1 = list(filter(lambda i: not str(i).endswith('.stderr'), files)) f2 = list(filter(lambda i: str(i).endswith('.stderr'), files)) files = f1 + f2 with open(outy, mode='ab') as fo: for f in files: try: with open(f, mode='rb') as fi: fo.write(fi.read()) except OSError: pass # if there is an error, some files may be missing # On windows we need to patch output to match expected test results. if os.name == 'nt' or _ctx.wine: patch_newlines_and_slashes(outy, stem) # compare results status = '' if os.path.isfile(orig) else 'MISSING' if not status: with open(f'{outy}.diff', mode='wb') as process_out: args = [_ctx.diff] + _ctx.diff_opts + [orig, outy] process = subprocess.Popen( list(filter(None, args)), stdout=process_out, stderr=process_out ) # wait blocks until the process terminates process.wait() status = 'FAIL' if process.returncode != 0 else 'OK' if status != 'OK' or _ctx.verbose: print('{:<10s} {:s}'.format(status, outx)) ran_tests += 1 # cleanup if status == 'OK': if not _ctx.keep_temp_files: all_files = [header, outy, orig, outx] all_files += [str(f) for f in Path('.').glob(f'{outy}.*')] remove(*all_files) else: hard_errors += 1 # skeleton tests (exclude non-C tests and tests that output help message) elif ext == 'c' and '--help' not in switches: # cleanup temporary files remove(outy) switches = f'{switches} --skeleton -Werror-undefined-control-flow' # run re2c args = [_ctx.valgrind] + _ctx.valgrind_opts args += [_ctx.wine, _ctx.re2c] + _ctx.incpaths + switches.split() with open(f'{outy}.stderr', mode='wb') as process_out: process = subprocess.Popen( list(filter(None, args)), stdout=process_out, stderr=process_out, ) # wait blocks until the process terminates process.wait() status = process.returncode # Use plain 'cc' instead of @-substition, because CMake and # Autoconf use different variables (CMAKE_C_COMPILER and CC) if status == 0: with open(f'{outy}.stderr', mode='ab') as process_out: process = subprocess.Popen( ['cc', '-Wall', '-Wextra', '-o', f'{outy}.out', outy], stdout=subprocess.DEVNULL, stderr=process_out, ) # wait blocks until the process terminates process.wait() if process.returncode != 0: status = 2 if status == 0: with open(f'{outy}.stderr', mode='ab') as process_out: process = subprocess.Popen( [f'./{outy}.out'], stdout=subprocess.DEVNULL, stderr=process_out, ) # wait blocks until the process terminates process.wait() if process.returncode != 0: status = 3 status_map = { 0: 'OK', 1: 'OK (expected re2c error)', 2: 'FAIL (compilation error)', 3: 'FAIL (runtime error)', } msg = status_map.get(status, 'FAIL (unknown error)') if status not in [0, 1] or _ctx.verbose: print('{:<25s} {:s}'.format(msg, outx)) if status in [0, 1] and not _ctx.keep_temp_files: files = [outx] + [str(f) for f in Path('.').glob(f'{outy}*')] remove(*files) ran_tests += 1 if status == 0: pass elif status == 1: soft_errors += 1 else: hard_errors += 1 else: # skeleton tests for unsupported language; # cleanup temporary files if not _ctx.keep_temp_files: remove(outx, outy) return ran_tests, soft_errors, hard_errors def init_context(base_path, skeleton=False, keep_temp_files=False, valgrind=False, verbose=False, wine=False): """Call when new processes start. This function is used as an initializer on a per-process basis due to 'spawn' process strategy (at least on Windows and macOs). """ _ctx.base_path = abshere(base_path) _ctx.skeleton = skeleton _ctx.keep_temp_files = keep_temp_files _ctx.verbose = verbose # Find 're2c.exe' on Windows or 're2c' for Linux/UNIX in TOP_BUILDIR # (shutil.which cannot find re2c.exe when running tests on Wine). exe_sfx = '.exe' if wine else '' _ctx.re2c = abshere(shutil.which('re2c' + exe_sfx, path=TOP_BUILDIR)) if not _ctx.re2c: raise RuntimeError(f'Cannot find re2c executable') # Set include paths, relative to build directory cwd = os.getcwd() os.chdir(_ctx.base_path) paths = Path('.').rglob('*') _ctx.incpaths = sum([['-I', str(d)] for d in paths if os.path.isdir(d)], []) os.chdir(cwd) top_dir = os.path.normpath(TOP_SRCDIR) if os.path.isabs(TOP_SRCDIR): _ctx.incpaths += ['-I', os.path.join(top_dir, 'include')] else: _ctx.incpaths += ['-I', os.path.join('..', top_dir, 'include')] # Set tools paths _ctx.valgrind_opts = [] if valgrind: _ctx.valgrind = shutil.which('valgrind') _ctx.valgrind_opts = [ '-q', '--track-origins=yes', '--num-callers=50', '--leak-check=full', '--show-reachable=yes', '--malloc-fill=0xa1', '--free-fill=0xa1', ] _ctx.diff = shutil.which('diff') _ctx.diff_opts = [] if wine or os.name == 'nt': # Ignore whitespace at the end of line _ctx.diff_opts = ['-b'] if wine: _ctx.wine = shutil.which('wine') def environment_check(valgrind=False, wine=False): """Check environment and requirements.""" if not shutil.which('diff'): raise RuntimeError("The 'diff' utility is missing from PATH") if valgrind and not shutil.which('valgrind'): raise RuntimeError("The 'valgrind' tool is missing from the PATH") if wine and not shutil.which('wine'): raise RuntimeError("The 'wine' layer is missing from the PATH") def main(): """The main function to run tests from the command line.""" args = parse_args() # Check environment environment_check(args.valgrind, args.wine) jobs = abs(args.jobs or 1) print(f'Running in {jobs} job(s)') test_blddir = create_tests_tree() copy_tests(args.tests, test_blddir) if os.name == 'posix': subprocess.call(['chmod', '-R', 'u+w', test_blddir]) init_context( test_blddir, skeleton=args.skeleton, keep_temp_files=args.keep_temp_files, valgrind=args.valgrind, verbose=args.verbose, wine=args.wine) clean_test_tree(test_blddir) tests = filter_tests(test_blddir) # collect results from all threads total = len(tests) total_ran_tests = 0 total_hard_errors = 0 total_soft_errors = 0 with multiprocessing.Pool( processes=jobs, initializer=init_context, # Note: the arguments order is important here initargs=(test_blddir, args.skeleton, args.keep_temp_files, args.valgrind, args.verbose, args.wine,) ) as pool: for (ran, soft_err, hard_err) in pool.imap_unordered(run_test, tests): total_ran_tests += ran total_soft_errors += soft_err total_hard_errors += hard_err # Recursively remove subdirectories that do not contain .re files. # Note that the removed subdirectories are still listed when iterating the # parent directory on bottom-up recursive return, so we have to record all # removed paths in a set and use it to filter subdirectories. removed_subdirs = set() for path, subdirs, files in os.walk(test_blddir, topdown=False): re_files = [f for f in files if str(f).endswith('.re')] subdirs = [d for d in subdirs if os.path.join(path, d) not in removed_subdirs] if len(re_files) == 0 and len(subdirs) == 0: shutil.rmtree(path) removed_subdirs.add(path) # report results print('-----------------') print(f'All: {total}') print(f'Ran: {total_ran_tests}') print(f'Passed: {total_ran_tests - total_hard_errors - total_soft_errors}') print(f'Soft errors: {total_soft_errors}') print(f'Hard errors: {total_hard_errors}') print('-----------------') if total_hard_errors != 0: print('FAILED') return 1 else: print('PASSED') return 0 # This should be in the global scope _ctx = Context() # Run main() when current file is executed by an interpreter. if __name__ == '__main__': try: sys.exit(main()) except KeyboardInterrupt: # The user hit Control-C sys.stderr.write('\nReceived keyboard interrupt, terminating.\n\n') sys.stderr.flush() # Control-C is fatal error signal 2, for more see # https://tldp.org/LDP/abs/html/exitcodes.html sys.exit(128 + signal.SIGINT) except RuntimeError as exc: sys.stderr.write(f'\n{exc}\n\n') sys.stderr.flush() sys.exit(1)