From db20f3f1bb8595633a7e16c8900fd401a453a6b5 Mon Sep 17 00:00:00 2001 From: Jiyoung Yun Date: Tue, 27 Dec 2016 16:46:08 +0900 Subject: Imported Upstream version 1.0.0.9127 --- src/ToolBox/SOS/tests/test_libsosplugin.py | 338 +++++++++++++++++++++++------ 1 file changed, 269 insertions(+), 69 deletions(-) (limited to 'src/ToolBox/SOS/tests/test_libsosplugin.py') diff --git a/src/ToolBox/SOS/tests/test_libsosplugin.py b/src/ToolBox/SOS/tests/test_libsosplugin.py index e4f59ebbcf..e5a5906264 100644 --- a/src/ToolBox/SOS/tests/test_libsosplugin.py +++ b/src/ToolBox/SOS/tests/test_libsosplugin.py @@ -1,3 +1,4 @@ +from __future__ import print_function import unittest import argparse import re @@ -5,80 +6,279 @@ import tempfile import subprocess import threading import os -import os.path import sys +import inspect + +lldb = '' +clrdir = '' +workdir = '' +corerun = '' +sosplugin = '' +assembly = '' +fail_flag = '' +fail_flag_lldb = '' +summary_file = '' +timeout = 0 +regex = '' +repeat = 0 + + +def runWithTimeout(cmd): + p = None + + def run(): + global p + p = subprocess.Popen(cmd, shell=True) + p.communicate() + + thread = threading.Thread(target=run) + thread.start() + + thread.join(timeout) + if thread.is_alive(): + with open(summary_file, 'a+') as summary: + print('Timeout!', file=summary) + p.kill() + thread.join() + -assemblyName='' -clrArgs='' -fail_flag='/tmp/fail_flag' - -# helper functions - -def prepareScenarioFile(moduleName): - global assemblyName - #create a temporary scenario file - fd, scenarioFileName = tempfile.mkstemp() - scenarioFile = open(scenarioFileName, 'w') - scenarioFile.write('script from runprocess import run\n') - scenarioFile.write('script run("'+assemblyName+'", "'+moduleName+'")\n') - scenarioFile.write('quit\n') - scenarioFile.close() - os.close(fd) - return scenarioFileName - -def runWithTimeout(cmd, timeout): - d = {'process': None} - def run(): - d['process'] = subprocess.Popen(cmd, shell=True) - d['process'].communicate() - - thread = threading.Thread(target=run) - thread.start() - - thread.join(timeout) - if thread.is_alive(): - d['process'].terminate() - thread.join() - -# Test class class TestSosCommands(unittest.TestCase): - def do_test(self, command): - global clrArgs - global fail_flag - filename = prepareScenarioFile(command) - cmd = "lldb --source "+filename+" -b -K \"OnCrash.do\" -- "+clrArgs+" > "+command+".log 2>"+command+".log.2" - runWithTimeout(cmd, 120) - self.assertFalse(os.path.isfile(fail_flag)) - os.unlink(filename) + def do_test(self, command): + open(fail_flag, 'a').close() + try: + os.unlink(fail_flag_lldb) + except: + pass + + cmd = (('%s -b ' % lldb) + + ("-k \"script open('%s', 'a').close()\" " % fail_flag_lldb) + + ("-k 'quit' ") + + ("--no-lldbinit ") + + ("-O \"plugin load %s \" " % sosplugin) + + ("-o \"script import testutils as test\" ") + + ("-o \"script test.fail_flag = '%s'\" " % fail_flag) + + ("-o \"script test.summary_file = '%s'\" " % summary_file) + + ("-o \"script test.run('%s', '%s')\" " % (assembly, command)) + + ("-o \"quit\" ") + + (" -- %s %s > %s.log 2> %s.log.2" % (corerun, assembly, + command, command))) + + runWithTimeout(cmd) + self.assertFalse(os.path.isfile(fail_flag)) + self.assertFalse(os.path.isfile(fail_flag_lldb)) + + try: + os.unlink(fail_flag) + except: + pass + try: + os.unlink(fail_flag_lldb) + except: + pass + + def t_cmd_bpmd_nofuturemodule_module_function(self): + self.do_test('t_cmd_bpmd_nofuturemodule_module_function') + + def t_cmd_bpmd_module_function(self): + self.do_test('t_cmd_bpmd_module_function') + + def t_cmd_bpmd_module_function_iloffset(self): + self.do_test('t_cmd_bpmd_module_function_iloffset') + + def t_cmd_bpmd_methoddesc(self): + self.do_test('t_cmd_bpmd_methoddesc') + + def t_cmd_bpmd_clearall(self): + self.do_test('t_cmd_bpmd_clearall') + + def t_cmd_clrstack(self): + self.do_test('t_cmd_clrstack') + + def t_cmd_clrthreads(self): + self.do_test('t_cmd_clrthreads') + + def t_cmd_clru(self): + self.do_test('t_cmd_clru') + + def t_cmd_dumpclass(self): + self.do_test('t_cmd_dumpclass') + + def t_cmd_dumpheap(self): + self.do_test('t_cmd_dumpheap') + + def t_cmd_dumpil(self): + self.do_test('t_cmd_dumpil') + + def t_cmd_dumplog(self): + self.do_test('t_cmd_dumplog') + + def t_cmd_dumpmd(self): + self.do_test('t_cmd_dumpmd') + + def t_cmd_dumpmodule(self): + self.do_test('t_cmd_dumpmodule') + + def t_cmd_dumpmt(self): + self.do_test('t_cmd_dumpmt') + + def t_cmd_dumpobj(self): + self.do_test('t_cmd_dumpobj') + + def t_cmd_dumpstack(self): + self.do_test('t_cmd_dumpstack') + + def t_cmd_dso(self): + self.do_test('t_cmd_dso') + + def t_cmd_eeheap(self): + self.do_test('t_cmd_eeheap') - def test_dumpmodule(self): - self.do_test("dumpmodule") + def t_cmd_eestack(self): + self.do_test('t_cmd_eestack') + + def t_cmd_gcroot(self): + self.do_test('t_cmd_gcroot') + + def t_cmd_ip2md(self): + self.do_test('t_cmd_ip2md') + + def t_cmd_name2ee(self): + self.do_test('t_cmd_name2ee') + + def t_cmd_pe(self): + self.do_test('t_cmd_pe') + + def t_cmd_histclear(self): + self.do_test('t_cmd_histclear') + + def t_cmd_histinit(self): + self.do_test('t_cmd_histinit') + + def t_cmd_histobj(self): + self.do_test('t_cmd_histobj') + + def t_cmd_histobjfind(self): + self.do_test('t_cmd_histobjfind') + + def t_cmd_histroot(self): + self.do_test('t_cmd_histroot') + + def t_cmd_sos(self): + self.do_test('t_cmd_sos') + + def t_cmd_soshelp(self): + self.do_test('t_cmd_soshelp') + + +def generate_report(): + report = [{'name': 'TOTAL', True: 0, False: 0, 'completed': True}] + fail_messages = [] + + if not os.path.isfile(summary_file): + print('No summary file to process!') + return + + with open(summary_file, 'r') as summary: + for line in summary: + if line.startswith('new_suite: '): + report.append({'name': line.split()[-1], True: 0, False: 0, + 'completed': False, 'timeout': False}) + elif line.startswith('True'): + report[-1][True] += 1 + elif line.startswith('False'): + report[-1][False] += 1 + elif line.startswith('Completed!'): + report[-1]['completed'] = True + elif line.startswith('Timeout!'): + report[-1]['timeout'] = True + elif line.startswith('!!! '): + fail_messages.append(line.rstrip('\n')) + + for suite in report[1:]: + report[0][True] += suite[True] + report[0][False] += suite[False] + report[0]['completed'] &= suite['completed'] + + for line in fail_messages: + print(line) + + print() + print('=' * 79) + print('{:72} {:6}'.format('Test suite', 'Result')) + print('-' * 79) + for suite in report[1:]: + if suite['timeout']: + result = 'Timeout' + elif suite[False]: + result = 'Fail' + elif not suite['completed']: + result = 'Crash' + elif suite[True]: + result = 'Success' + else: + result = 'Please, report' + print('{:68} {:>10}'.format(suite['name'], result)) + print('=' * 79) - def test_dumpil(self): - self.do_test("dumpil") - if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--clr-args', default='') - parser.add_argument('unittest_args', nargs='*') - - args = parser.parse_args() - - clrArgs = args.clr_args - print("ClrArgs: " + clrArgs) - # find assembly name among lldb arguments - assembly_regexp = re.compile("([^\s]+\.exe)") - assemblyMatch = assembly_regexp.search(clrArgs) - if assemblyMatch is not None: - assemblyName = assemblyMatch.group(1) - else: - print("Assembly not recognized") - exit(1) - - print("Assembly name: "+assemblyName) - sys.argv[1:] = args.unittest_args - suite = unittest.TestLoader().loadTestsFromTestCase(TestSosCommands) - unittest.TextTestRunner(verbosity=2).run(suite) - os.unlink(fail_flag) \ No newline at end of file + parser = argparse.ArgumentParser() + parser.add_argument('--lldb', default='lldb') + parser.add_argument('--clrdir', default='.') + parser.add_argument('--workdir', default='.') + parser.add_argument('--assembly', default='Test.exe') + parser.add_argument('--timeout', default=90) + parser.add_argument('--regex', default='t_cmd_') + parser.add_argument('--repeat', default=1) + parser.add_argument('unittest_args', nargs='*') + + args = parser.parse_args() + + lldb = args.lldb + clrdir = args.clrdir + workdir = args.workdir + assembly = args.assembly + timeout = int(args.timeout) + regex = args.regex + repeat = int(args.repeat) + print("lldb: %s" % lldb) + print("clrdir: %s" % clrdir) + print("workdir: %s" % workdir) + print("assembly: %s" % assembly) + print("timeout: %i" % timeout) + print("regex: %s" % regex) + print("repeat: %i" % repeat) + + corerun = os.path.join(clrdir, 'corerun') + sosplugin = os.path.join(clrdir, 'libsosplugin.so') + if os.name != 'posix': + print('Not implemented: corerun.exe, sosplugin.dll?') + exit(1) + + print("corerun: %s" % corerun) + print("sosplugin: %s" % sosplugin) + + fail_flag = os.path.join(workdir, 'fail_flag') + fail_flag_lldb = os.path.join(workdir, 'fail_flag.lldb') + + print("fail_flag: %s" % fail_flag) + print("fail_flag_lldb: %s" % fail_flag_lldb) + + summary_file = os.path.join(workdir, 'summary') + print("summary_file: %s" % summary_file) + + try: + os.unlink(summary_file) + except: + pass + + sys.argv[1:] = args.unittest_args + suite = unittest.TestSuite() + all_tests = inspect.getmembers(TestSosCommands, predicate=inspect.ismethod) + for (test_name, test_func) in all_tests: + if re.match(regex, test_name): + suite.addTest(TestSosCommands(test_name)) + unittest.TextTestRunner(verbosity=1).run(suite) + + generate_report() -- cgit v1.2.3