Mercurial > ~astiob > upreckon > hgweb
view 2.00/testcases.py @ 40:af9c45708987
Cemented a decision previously being unsure about
The mere presense of the tasknames configuration variable now always makes problem names to be printed.
This is not new, but the old behaviour (only printing names if we test more than one problem), previously commented out, has now been removed altogether.
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Sun, 05 Dec 2010 14:34:24 +0100 |
parents | 2b459f9743b4 |
children | 164395af969d |
line wrap: on
line source
#! /usr/bin/env python # Copyright (c) 2010 Chortos-2 <chortos@inbox.lv> from __future__ import division, with_statement try: from compat import * import files, problem, config except ImportError: import __main__ __main__.import_error(sys.exc_info()[1]) else: from __main__ import clock, options import glob, re, sys, tempfile, time from subprocess import Popen, PIPE, STDOUT import os devnull = open(os.path.devnull, 'w+') try: from signal import SIGTERM, SIGKILL except ImportError: SIGTERM = 15 SIGKILL = 9 try: from _subprocess import TerminateProcess except ImportError: # CPython 2.5 does define _subprocess.TerminateProcess even though it is # not used in the subprocess module, but maybe something else does not try: import ctypes TerminateProcess = ctypes.windll.kernel32.TerminateProcess except (ImportError, AttributeError): TerminateProcess = None # Do the hacky-wacky dark magic needed to catch presses of the Escape button. # If only Python supported forcible termination of threads... if not sys.stdin.isatty(): canceled = init_canceled = lambda: False pause = None else: try: # Windows has select() too, but it is not the select() we want import msvcrt except ImportError: try: import select, termios, tty, atexit except ImportError: # It cannot be helped! # Silently disable support for killing the program being tested canceled = init_canceled = lambda: False pause = None else: def cleanup(old=termios.tcgetattr(sys.stdin.fileno())): termios.tcsetattr(sys.stdin.fileno(), termios.TCSAFLUSH, old) atexit.register(cleanup) del cleanup tty.setcbreak(sys.stdin.fileno()) def canceled(select=select.select, stdin=sys.stdin, read=sys.stdin.read): while select((stdin,), (), (), 0)[0]: if read(1) == '\33': return True return False def init_canceled(): while select.select((sys.stdin,), (), (), 0)[0]: sys.stdin.read(1) def pause(): sys.stdin.read(1) else: def canceled(kbhit=msvcrt.kbhit, getch=msvcrt.getch): while kbhit(): c = getch() if c == '\33': return True elif c == '\0': # Let's hope no-one is fiddling with this getch() return False def init_canceled(): while msvcrt.kbhit(): msvcrt.getch() def pause(): msvcrt.getch() __all__ = ('TestCase', 'load_problem', 'TestCaseNotPassed', 'TimeLimitExceeded', 'CanceledByUser', 'WrongAnswer', 'NonZeroExitCode', 'CannotStartTestee', 'CannotStartValidator', 'CannotReadOutputFile', 'CannotReadInputFile', 'CannotReadAnswerFile') # Exceptions class TestCaseNotPassed(Exception): __slots__ = () class TimeLimitExceeded(TestCaseNotPassed): __slots__ = () class CanceledByUser(TestCaseNotPassed): __slots__ = () class WrongAnswer(TestCaseNotPassed): __slots__ = 'comment' def __init__(self, comment=''): self.comment = comment class NonZeroExitCode(TestCaseNotPassed): __slots__ = 'exitcode' def __init__(self, exitcode): self.exitcode = exitcode class ExceptionWrapper(TestCaseNotPassed): __slots__ = 'upstream' def __init__(self, upstream): self.upstream = upstream class CannotStartTestee(ExceptionWrapper): __slots__ = () class CannotStartValidator(ExceptionWrapper): __slots__ = () class CannotReadOutputFile(ExceptionWrapper): __slots__ = () class CannotReadInputFile(ExceptionWrapper): __slots__ = () class CannotReadAnswerFile(ExceptionWrapper): __slots__ = () # Helper context managers class CopyDeleting(object): __slots__ = 'case', 'file', 'name' def __init__(self, case, file, name): self.case = case self.file = file self.name = name def __enter__(self): if self.name: try: self.file.copy(self.name) except: try: self.__exit__(None, None, None) except: pass raise def __exit__(self, exc_type, exc_val, exc_tb): if self.name: self.case.files_to_delete.append(self.name) class Copying(object): __slots__ = 'file', 'name' def __init__(self, file, name): self.file = file self.name = name def __enter__(self): if self.name: self.file.copy(self.name) def __exit__(self, exc_type, exc_val, exc_tb): pass # Test case types class TestCase(object): __slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points', 'process', 'time_started', 'time_stopped', 'time_limit_string', 'realinname', 'realoutname', 'maxtime', 'maxmemory', 'has_called_back', 'files_to_delete') if ABCMeta: __metaclass__ = ABCMeta def __init__(case, prob, id, isdummy, points): case.problem = prob case.id = id case.isdummy = isdummy case.points = points case.maxtime = case.problem.config.maxtime case.maxmemory = case.problem.config.maxmemory if case.maxtime: case.time_limit_string = '/%.3f' % case.maxtime else: case.time_limit_string = '' if not isdummy: case.realinname = case.problem.config.testcaseinname case.realoutname = case.problem.config.testcaseoutname else: case.realinname = case.problem.config.dummyinname case.realoutname = case.problem.config.dummyoutname @abstractmethod def test(case): raise NotImplementedError def __call__(case, callback): case.has_called_back = False case.files_to_delete = [] try: return case.test(callback) finally: now = clock() if not getattr(case, 'time_started', None): case.time_started = case.time_stopped = now elif not getattr(case, 'time_stopped', None): case.time_stopped = now if not case.has_called_back: callback() case.cleanup() def cleanup(case): #if getattr(case, 'infile', None): # case.infile.close() #if getattr(case, 'outfile', None): # case.outfile.close() if getattr(case, 'process', None): # Try killing after three unsuccessful TERM attempts in a row # (except on Windows, where TERMing is killing) for i in range(3): try: try: case.process.terminate() except AttributeError: # Python 2.5 if TerminateProcess and hasattr(proc, '_handle'): # Windows API TerminateProcess(proc._handle, 1) else: # POSIX os.kill(proc.pid, SIGTERM) except Exception: time.sleep(0) case.process.poll() else: case.process.wait() break else: # If killing the process is unsuccessful three times in a row, # just silently stop trying for i in range(3): try: try: case.process.kill() except AttributeError: # Python 2.5 if TerminateProcess and hasattr(proc, '_handle'): # Windows API TerminateProcess(proc._handle, 1) else: # POSIX os.kill(proc.pid, SIGKILL) except Exception: time.sleep(0) case.process.poll() else: case.process.wait() break if case.files_to_delete: for name in case.files_to_delete: try: os.remove(name) except Exception: # It can't be helped pass def open_infile(case): try: case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadInputFile(e) def open_outfile(case): try: case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadAnswerFile(e) class ValidatedTestCase(TestCase): __slots__ = 'validator' def __init__(case, *args): TestCase.__init__(case, *args) if not case.problem.config.tester: case.validator = None else: case.validator = case.problem.config.tester def validate(case, output): if not case.validator: # Compare the output with the reference output case.open_outfile() with case.outfile.open() as refoutput: for line, refline in zip_longest(output, refoutput): if refline is not None and not isinstance(refline, basestring): line = bytes(line, sys.getdefaultencoding()) if line != refline: raise WrongAnswer return 1 elif callable(case.validator): return case.validator(output) else: # Call the validator program output.close() if case.problem.config.ansname: case.open_outfile() case.outfile.copy(case.problem.config.ansname) try: case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartValidator(sys.exc_info()[1]) comment = case.process.communicate()[0].strip() match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment) if match: comment = comment[match.end():] if not case.problem.config.maxexitcode: if case.process.returncode: raise WrongAnswer(comment) else: return 1, comment else: return case.process.returncode / case.problem.config.maxexitcode, comment class BatchTestCase(ValidatedTestCase): __slots__ = () def test(case, callback): init_canceled() if sys.platform == 'win32' or not case.maxmemory: preexec_fn = None else: def preexec_fn(): try: import resource maxmemory = int(case.maxmemory * 1048576) resource.setrlimit(resource.RLIMIT_AS, (maxmemory, maxmemory)) # I would also set a CPU time limit but I do not want the time # that passes between the calls to fork and exec to be counted in except MemoryError: # We do not have enough memory for ourselves; # let the parent know about this raise except Exception: # Well, at least we tried pass case.open_infile() case.time_started = None if case.problem.config.stdio: if options.erase and not case.validator: # TODO: re-use the same file name if possible # FIXME: 2.5 lacks the delete parameter with tempfile.NamedTemporaryFile(delete=False) as f: inputdatafname = f.name contextmgr = CopyDeleting(case, case.infile, inputdatafname) else: inputdatafname = case.problem.config.inname contextmgr = Copying(case.infile, inputdatafname) with contextmgr: # FIXME: this U doesn't do anything good for the child process, does it? with open(inputdatafname, 'rU') as infile: with tempfile.TemporaryFile('w+') if options.erase and not case.validator else open(case.problem.config.outname, 'w+') as outfile: # TODO: make sure outfile.file is passed to Popen if needed try: try: case.process = Popen(case.problem.config.path, stdin=infile, stdout=outfile, stderr=devnull, universal_newlines=True, bufsize=-1, preexec_fn=preexec_fn) except MemoryError: # If there is not enough memory for the forked test.py, # opt for silent dropping of the limit # TODO: show a warning somewhere case.process = Popen(case.problem.config.path, stdin=infile, stdout=outfile, stderr=devnull, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartTestee(sys.exc_info()[1]) case.time_started = clock() time_next_check = case.time_started + .15 if not case.maxtime: while True: exitcode, now = case.process.poll(), clock() if exitcode is not None: case.time_stopped = now break # For some reason (probably Microsoft's fault), # msvcrt.kbhit() is slow as hell else: if now >= time_next_check: if canceled(): raise CanceledByUser else: time_next_check = now + .15 time.sleep(0) else: time_end = case.time_started + case.maxtime while True: exitcode, now = case.process.poll(), clock() if exitcode is not None: case.time_stopped = now break elif now >= time_end: raise TimeLimitExceeded else: if now >= time_next_check: if canceled(): raise CanceledByUser else: time_next_check = now + .15 time.sleep(0) if config.globalconf.force_zero_exitcode and case.process.returncode: raise NonZeroExitCode(case.process.returncode) callback() case.has_called_back = True outfile.seek(0) return case.validate(outfile) else: case.infile.copy(case.problem.config.inname) try: try: case.process = Popen(case.problem.config.path, stdin=devnull, stdout=devnull, stderr=STDOUT, preexec_fn=preexec_fn) except MemoryError: # If there is not enough memory for the forked test.py, # opt for silent dropping of the limit # TODO: show a warning somewhere case.process = Popen(case.problem.config.path, stdin=devnull, stdout=devnull, stderr=STDOUT) except OSError: raise CannotStartTestee(sys.exc_info()[1]) case.time_started = clock() time_next_check = case.time_started + .15 if not case.maxtime: while True: exitcode, now = case.process.poll(), clock() if exitcode is not None: case.time_stopped = now break else: if now >= time_next_check: if canceled(): raise CanceledByUser else: time_next_check = now + .15 time.sleep(0) else: time_end = case.time_started + case.maxtime while True: exitcode, now = case.process.poll(), clock() if exitcode is not None: case.time_stopped = now break elif now >= time_end: raise TimeLimitExceeded else: if now >= time_next_check: if canceled(): raise CanceledByUser else: time_next_check = now + .15 time.sleep(0) if config.globalconf.force_zero_exitcode and case.process.returncode: raise NonZeroExitCode(case.process.returncode) callback() case.has_called_back = True with open(case.problem.config.outname, 'rU') as output: return case.validate(output) # This is the only test case type not executing any programs to be tested class OutputOnlyTestCase(ValidatedTestCase): __slots__ = () def cleanup(case): pass class BestOutputTestCase(ValidatedTestCase): __slots__ = () # This is the only test case type executing two programs simultaneously class ReactiveTestCase(TestCase): __slots__ = () # The basic idea is to launch the program to be tested and the grader # and to pipe their standard I/O from and to each other, # and then to capture the grader's exit code and use it # like the exit code of an output validator is used. def load_problem(prob, _types={'batch' : BatchTestCase, 'outonly' : OutputOnlyTestCase, 'bestout' : BestOutputTestCase, 'reactive': ReactiveTestCase}): # We will need to iterate over these configuration variables twice try: len(prob.config.dummies) except Exception: prob.config.dummies = tuple(prob.config.dummies) try: len(prob.config.tests) except Exception: prob.config.tests = tuple(prob.config.tests) if options.legacy: prob.config.usegroups = False prob.config.tests = list(prob.config.tests) for i, name in enumerate(prob.config.tests): # Same here; we'll need to iterate over them twice try: l = len(name) except Exception: try: name = tuple(name) except TypeError: name = (name,) l = len(name) if len(name) > 1: prob.config.usegroups = True break elif not len(name): prob.config.tests[i] = (name,) # First get prob.cache.padoutput right, # then yield the actual test cases for i in prob.config.dummies: s = 'sample ' + str(i).zfill(prob.config.paddummies) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) if prob.config.usegroups: for group in prob.config.tests: for i in group: s = str(i).zfill(prob.config.padtests) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) for i in prob.config.dummies: s = str(i).zfill(prob.config.paddummies) yield _types[prob.config.kind](prob, s, True, 0) for group in prob.config.tests: yield problem.TestGroup() for i in group: s = str(i).zfill(prob.config.padtests) yield _types[prob.config.kind](prob, s, False, prob.config.pointmap.get(i, prob.config.pointmap.get(None, prob.config.maxexitcode if prob.config.maxexitcode else 1))) yield problem.test_context_end else: for i in prob.config.tests: s = str(i).zfill(prob.config.padtests) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) for i in prob.config.dummies: s = str(i).zfill(prob.config.paddummies) yield _types[prob.config.kind](prob, s, True, 0) for i in prob.config.tests: s = str(i).zfill(prob.config.padtests) yield _types[prob.config.kind](prob, s, False, prob.config.pointmap.get(i, prob.config.pointmap.get(None, prob.config.maxexitcode if prob.config.maxexitcode else 1)))