Mercurial > ~astiob > upreckon > hgweb
view testcases.py @ 76:0e5ae28e0b2b
Points are now weighted on a test context basis
In particular, this has allowed for simple extensions to the format
of testconf to award points to whole test groups without at the same time
compromising the future ability of giving partial score for correct
but slow solutions. Specifically, the groupweight configuration variable
has been added and normally has the format {groupindex: points} where
groupindex is the group's index in the tests configuration variable.
The backwards incompatible change is that test contexts are no longer
guaranteed to learn the score awarded or the maximum possible score
for every test case and may instead be notified about them in batches.
In other news, the pointmap and groupweight configuration variables can
(now) be given as sequences in addition to mappings. (Technically,
the distinction currently made is dict versus everything else.) Items
of a sequence pointmap/groupweight correspond directly to the test cases/
groups defined in the tests configuration variable; in particular,
when groups are used, tests=[1],[2,3];pointmap={1:1,2:2,3:3} can now be
written as pointmap=tests=[1],[2,3]. Missing items are handled in the same
way in which they are handled when the variable is a mapping. Note
that the items of groupweight correspond to whole test groups rather
than individual test cases.
In other news again, the wording of problem total lines has been changed
from '<unweighted> points; weighted score: <weighted>' to '<weighted>
points (<unweighted> before weighting)', and group total lines now
properly report fractional numbers of points (this is a bug fix).
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Sat, 08 Jan 2011 16:03:35 +0200 |
parents | 7520b6bb6636 |
children | 69eadc60f4e2 |
line wrap: on
line source
#! /usr/bin/env python # Copyright (c) 2010 Chortos-2 <chortos@inbox.lv> # TODO: copy the ansfile if not options.erase even if no validator is used from __future__ import division, with_statement try: from compat import * import files, problem, config except ImportError: import __main__ __main__.import_error(sys.exc_info()[1]) else: from __main__ import clock, options import glob, re, sys, tempfile, time from subprocess import Popen, PIPE, STDOUT import os devnull = open(os.path.devnull, 'w+') try: from signal import SIGTERM, SIGKILL except ImportError: SIGTERM = 15 SIGKILL = 9 try: from _subprocess import TerminateProcess except ImportError: # CPython 2.5 does define _subprocess.TerminateProcess even though it is # not used in the subprocess module, but maybe something else does not try: import ctypes TerminateProcess = ctypes.windll.kernel32.TerminateProcess except (ImportError, AttributeError): TerminateProcess = None # Do not show error messages due to errors in the program being tested try: import ctypes try: errmode = ctypes.windll.kernel32.GetErrorMode() except AttributeError: errmode = ctypes.windll.kernel32.SetErrorMode(0) errmode |= 0x8003 ctypes.windll.kernel32.SetErrorMode(errmode) except Exception: pass # Do the hacky-wacky dark magic needed to catch presses of the Escape button. # If only Python supported forcible termination of threads... if not sys.stdin.isatty(): canceled = init_canceled = lambda: False pause = None else: try: # Windows has select() too, but it is not the select() we want import msvcrt except ImportError: try: from select import select import termios, tty, atexit except ImportError: # It cannot be helped! # Silently disable support for killing the program being tested canceled = init_canceled = lambda: False pause = None else: def cleanup(old=termios.tcgetattr(sys.stdin.fileno())): termios.tcsetattr(sys.stdin.fileno(), termios.TCSAFLUSH, old) atexit.register(cleanup) del cleanup tty.setcbreak(sys.stdin.fileno()) def canceled(select=select, stdin=sys.stdin, read=sys.stdin.read): while select((stdin,), (), (), 0)[0]: if read(1) == '\33': return True return False def init_canceled(): while select((sys.stdin,), (), (), 0)[0]: sys.stdin.read(1) def pause(): sys.stdin.read(1) else: def canceled(kbhit=msvcrt.kbhit, getch=msvcrt.getch): while kbhit(): c = getch() if c == '\33': return True elif c == '\0': # Let's hope no-one is fiddling with this getch() return False def init_canceled(): while msvcrt.kbhit(): msvcrt.getch() def pause(): msvcrt.getch() try: from signal import SIGCHLD, signal, SIG_DFL from select import select, error as select_error from errno import EINTR import fcntl try: import cPickle as pickle except ImportError: import pickle except ImportError: try: from _subprocess import WAIT_OBJECT_0, STD_INPUT_HANDLE, INFINITE except ImportError: WAIT_OBJECT_0 = 0 STD_INPUT_HANDLE = -10 INFINITE = -1 try: import ctypes SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode FlushConsoleInputBuffer = ctypes.windll.kernel32.FlushConsoleInputBuffer WaitForMultipleObjects = ctypes.windll.kernel32.WaitForMultipleObjects ReadConsoleInputA = ctypes.windll.kernel32.ReadConsoleInputA try: from _subprocess import GetStdHandle except ImportError: GetStdHandle = ctypes.windll.kernel32.GetStdHandle except (ImportError, AttributeError): console_input = False else: hStdin = GetStdHandle(STD_INPUT_HANDLE) console_input = bool(SetConsoleMode(hStdin, 1)) if console_input: FlushConsoleInputBuffer(hStdin) class KEY_EVENT_RECORD(ctypes.Structure): _fields_ = (("bKeyDown", ctypes.c_int), ("wRepeatCount", ctypes.c_ushort), ("wVirtualKeyCode", ctypes.c_ushort), ("wVirtualScanCode", ctypes.c_ushort), ("UnicodeChar", ctypes.c_wchar), ("dwControlKeyState", ctypes.c_uint)) class INPUT_RECORD(ctypes.Structure): _fields_ = (("EventType", ctypes.c_int), ("KeyEvent", KEY_EVENT_RECORD)) # Memory limits (currently) are not supported def call(*args, **kwargs): case = kwargs.pop('case') try: case.process = Popen(*args, **kwargs) except OSError: raise CannotStartTestee(sys.exc_info()[1]) case.time_started = clock() if not console_input: if case.maxtime: if WaitForSingleObject(case.process._handle, int(case.maxtime * 1000)) != WAIT_OBJECT_0: raise TimeLimitExceeded else: case.process.wait() else: ir = INPUT_RECORD() n = ctypes.c_int() lpHandles = (ctypes.c_int * 2)(hStdin, case.process._handle) if case.maxtime: time_end = clock() + case.maxtime while case.process.poll() is None: remaining = time_end - clock() if remaining > 0: if WaitForMultipleObjects(2, lpHandles, False, int(remaining * 1000)) == WAIT_OBJECT_0: ReadConsoleInputA(hStdin, ctypes.byref(ir), 1, ctypes.byref(n)) if ir.EventType == 1 and ir.KeyEvent.bKeyDown and ir.KeyEvent.wVirtualKeyCode == 27: raise CanceledByUser else: raise TimeLimitExceeded else: while case.process.poll() is None: if WaitForMultipleObjects(2, lpHandles, False, INFINITE) == WAIT_OBJECT_0: ReadConsoleInputA(hStdin, ctypes.byref(ir), 1, ctypes.byref(n)) if ir.EventType == 1 and ir.KeyEvent.bKeyDown and ir.KeyEvent.wVirtualKeyCode == 27: raise CanceledByUser case.time_stopped = clock() if not console_input: try: try: from _subprocess import WaitForSingleObject except ImportError: import ctypes WaitForSingleObject = ctypes.windll.kernel32.WaitForSingleObject except (ImportError, AttributeError): # TODO: move the default implementation here call = None else: # Make SIGCHLD interrupt sleep() and select() def bury_child(signum, frame): try: bury_child.case.time_stopped = clock() except Exception: pass signal(SIGCHLD, bury_child) # If you want this to work, don't set any stdio argument to PIPE def call_real(*args, **kwargs): bury_child.case = case = kwargs.pop('case') preexec_fn_ = kwargs.get('preexec_fn', None) read, write = os.pipe() def preexec_fn(): os.close(read) if preexec_fn_: preexec_fn_() fcntl.fcntl(write, fcntl.F_SETFD, fcntl.fcntl(write, fcntl.F_GETFD) | getattr(fcntl, 'FD_CLOEXEC', 1)) fwrite = os.fdopen(write, 'wb') pickle.dump(clock(), fwrite, 1) kwargs['preexec_fn'] = preexec_fn try: case.process = Popen(*args, **kwargs) except OSError: os.close(read) raise CannotStartTestee(sys.exc_info()[1]) finally: os.close(write) try: if pause is None: if case.maxtime: time.sleep(case.maxtime) if case.process.poll() is None: raise TimeLimitExceeded else: case.process.wait() else: if not case.maxtime: try: while case.process.poll() is None: if select((sys.stdin,), (), ())[0]: if sys.stdin.read(1) == '\33': raise CanceledByUser except select_error: if sys.exc_info()[1].args[0] != EINTR: raise else: case.process.poll() else: time_end = clock() + case.maxtime try: while case.process.poll() is None: remaining = time_end - clock() if remaining > 0: if select((sys.stdin,), (), (), remaining)[0]: if sys.stdin.read(1) == '\33': raise CanceledByUser else: raise TimeLimitExceeded except select_error: if sys.exc_info()[1].args[0] != EINTR: raise else: case.process.poll() finally: case.time_started = pickle.loads(os.read(read, 512)) os.close(read) del bury_child.case def call(*args, **kwargs): if 'preexec_fn' in kwargs: try: return call_real(*args, **kwargs) except MemoryError: # If there is not enough memory for the forked test.py, # opt for silent dropping of the limit # TODO: show a warning somewhere del kwargs['preexec_fn'] return call_real(*args, **kwargs) else: return call_real(*args, **kwargs) __all__ = ('TestCase', 'load_problem', 'TestCaseNotPassed', 'TimeLimitExceeded', 'CanceledByUser', 'WrongAnswer', 'NonZeroExitCode', 'CannotStartTestee', 'CannotStartValidator', 'CannotReadOutputFile', 'CannotReadInputFile', 'CannotReadAnswerFile') # Exceptions class TestCaseNotPassed(Exception): __slots__ = () class TimeLimitExceeded(TestCaseNotPassed): __slots__ = () class CanceledByUser(TestCaseNotPassed): __slots__ = () class WrongAnswer(TestCaseNotPassed): __slots__ = 'comment' def __init__(self, comment=''): self.comment = comment class NonZeroExitCode(TestCaseNotPassed): __slots__ = 'exitcode' def __init__(self, exitcode): self.exitcode = exitcode class ExceptionWrapper(TestCaseNotPassed): __slots__ = 'upstream' def __init__(self, upstream): self.upstream = upstream class CannotStartTestee(ExceptionWrapper): __slots__ = () class CannotStartValidator(ExceptionWrapper): __slots__ = () class CannotReadOutputFile(ExceptionWrapper): __slots__ = () class CannotReadInputFile(ExceptionWrapper): __slots__ = () class CannotReadAnswerFile(ExceptionWrapper): __slots__ = () # Helper context managers class CopyDeleting(object): __slots__ = 'case', 'file', 'name' def __init__(self, case, file, name): self.case = case self.file = file self.name = name def __enter__(self): if self.name: try: self.file.copy(self.name) except: try: self.__exit__(None, None, None) except: pass raise def __exit__(self, exc_type, exc_val, exc_tb): if self.name: self.case.files_to_delete.append(self.name) class Copying(object): __slots__ = 'file', 'name' def __init__(self, file, name): self.file = file self.name = name def __enter__(self): if self.name: self.file.copy(self.name) def __exit__(self, exc_type, exc_val, exc_tb): pass # Test case types class TestCase(object): __slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points', 'process', 'time_started', 'time_stopped', 'time_limit_string', 'realinname', 'realoutname', 'maxtime', 'maxmemory', 'has_called_back', 'files_to_delete') if ABCMeta: __metaclass__ = ABCMeta def __init__(case, prob, id, isdummy, points): case.problem = prob case.id = id case.isdummy = isdummy case.points = points case.maxtime = case.problem.config.maxtime case.maxmemory = case.problem.config.maxmemory if case.maxtime: case.time_limit_string = '/%.3f' % case.maxtime else: case.time_limit_string = '' if not isdummy: case.realinname = case.problem.config.testcaseinname case.realoutname = case.problem.config.testcaseoutname else: case.realinname = case.problem.config.dummyinname case.realoutname = case.problem.config.dummyoutname @abstractmethod def test(case): raise NotImplementedError def __call__(case, callback): case.has_called_back = False case.files_to_delete = [] try: return case.test(callback) finally: now = clock() if not getattr(case, 'time_started', None): case.time_started = case.time_stopped = now elif not getattr(case, 'time_stopped', None): case.time_stopped = now if not case.has_called_back: callback() case.cleanup() def cleanup(case): #if getattr(case, 'infile', None): # case.infile.close() #if getattr(case, 'outfile', None): # case.outfile.close() if getattr(case, 'process', None): # Try killing after three unsuccessful TERM attempts in a row # (except on Windows, where TERMing is killing) for i in range(3): try: try: case.process.terminate() except AttributeError: # Python 2.5 if TerminateProcess and hasattr(proc, '_handle'): # Windows API TerminateProcess(proc._handle, 1) else: # POSIX os.kill(proc.pid, SIGTERM) except Exception: time.sleep(0) case.process.poll() else: case.process.wait() break else: # If killing the process is unsuccessful three times in a row, # just silently stop trying for i in range(3): try: try: case.process.kill() except AttributeError: # Python 2.5 if TerminateProcess and hasattr(proc, '_handle'): # Windows API TerminateProcess(proc._handle, 1) else: # POSIX os.kill(proc.pid, SIGKILL) except Exception: time.sleep(0) case.process.poll() else: case.process.wait() break if case.files_to_delete: for name in case.files_to_delete: try: os.remove(name) except Exception: # It can't be helped pass def open_infile(case): try: case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadInputFile(e) def open_outfile(case): try: case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadAnswerFile(e) class ValidatedTestCase(TestCase): __slots__ = 'validator' def __init__(case, *args): TestCase.__init__(case, *args) if not case.problem.config.tester: case.validator = None else: case.validator = case.problem.config.tester def validate(case, output): if not case.validator: # Compare the output with the reference output case.open_outfile() with case.outfile.open() as refoutput: for line, refline in zip_longest(output, refoutput): if refline is not None and not isinstance(refline, basestring): line = bytes(line, sys.getdefaultencoding()) if line != refline: raise WrongAnswer return 1 elif callable(case.validator): return case.validator(output) else: # Call the validator program output.close() if case.problem.config.ansname: case.open_outfile() case.outfile.copy(case.problem.config.ansname) try: case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartValidator(sys.exc_info()[1]) comment = case.process.communicate()[0].strip() match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment) if match: comment = comment[match.end():] if not case.problem.config.maxexitcode: if case.process.returncode: raise WrongAnswer(comment) else: return 1, comment else: return case.process.returncode / case.problem.config.maxexitcode, comment class BatchTestCase(ValidatedTestCase): __slots__ = () def test(case, callback): init_canceled() if sys.platform == 'win32' or not case.maxmemory: preexec_fn = None else: def preexec_fn(): try: import resource maxmemory = int(case.maxmemory * 1048576) resource.setrlimit(resource.RLIMIT_AS, (maxmemory, maxmemory)) # I would also set a CPU time limit but I do not want the time # that passes between the calls to fork and exec to be counted in except MemoryError: # We do not have enough memory for ourselves; # let the parent know about this raise except Exception: # Well, at least we tried pass case.open_infile() case.time_started = None if case.problem.config.stdio: if options.erase and not case.validator or not case.problem.config.inname: # TODO: re-use the same file name if possible # FIXME: 2.5 lacks the delete parameter with tempfile.NamedTemporaryFile(delete=False) as f: inputdatafname = f.name contextmgr = CopyDeleting(case, case.infile, inputdatafname) else: inputdatafname = case.problem.config.inname contextmgr = Copying(case.infile, inputdatafname) with contextmgr: # FIXME: this U doesn't do anything good for the child process, does it? with open(inputdatafname, 'rU') as infile: with tempfile.TemporaryFile('w+') if options.erase and not case.validator else open(case.problem.config.outname, 'w+') as outfile: if call is not None: call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull, universal_newlines=True, bufsize=-1, preexec_fn=preexec_fn) else: try: try: case.process = Popen(case.problem.config.path, stdin=infile, stdout=outfile, stderr=devnull, universal_newlines=True, bufsize=-1, preexec_fn=preexec_fn) except MemoryError: # If there is not enough memory for the forked test.py, # opt for silent dropping of the limit # TODO: show a warning somewhere case.process = Popen(case.problem.config.path, stdin=infile, stdout=outfile, stderr=devnull, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartTestee(sys.exc_info()[1]) case.time_started = clock() time_next_check = case.time_started + .15 if not case.maxtime: while True: exitcode, now = case.process.poll(), clock() if exitcode is not None: case.time_stopped = now break # For some reason (probably Microsoft's fault), # msvcrt.kbhit() is slow as hell else: if now >= time_next_check: if canceled(): raise CanceledByUser else: time_next_check = now + .15 time.sleep(.001) else: time_end = case.time_started + case.maxtime while True: exitcode, now = case.process.poll(), clock() if exitcode is not None: case.time_stopped = now break elif now >= time_end: raise TimeLimitExceeded else: if now >= time_next_check: if canceled(): raise CanceledByUser else: time_next_check = now + .15 time.sleep(.001) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) callback() case.has_called_back = True outfile.seek(0) return case.validate(outfile) else: case.infile.copy(case.problem.config.inname) if call is not None: call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=STDOUT, preexec_fn=preexec_fn) else: try: try: case.process = Popen(case.problem.config.path, stdin=devnull, stdout=devnull, stderr=STDOUT, preexec_fn=preexec_fn) except MemoryError: # If there is not enough memory for the forked test.py, # opt for silent dropping of the limit # TODO: show a warning somewhere case.process = Popen(case.problem.config.path, stdin=devnull, stdout=devnull, stderr=STDOUT) except OSError: raise CannotStartTestee(sys.exc_info()[1]) case.time_started = clock() time_next_check = case.time_started + .15 if not case.maxtime: while True: exitcode, now = case.process.poll(), clock() if exitcode is not None: case.time_stopped = now break else: if now >= time_next_check: if canceled(): raise CanceledByUser else: time_next_check = now + .15 time.sleep(.001) else: time_end = case.time_started + case.maxtime while True: exitcode, now = case.process.poll(), clock() if exitcode is not None: case.time_stopped = now break elif now >= time_end: raise TimeLimitExceeded else: if now >= time_next_check: if canceled(): raise CanceledByUser else: time_next_check = now + .15 time.sleep(.001) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) callback() case.has_called_back = True with open(case.problem.config.outname, 'rU') as output: return case.validate(output) # This is the only test case type not executing any programs to be tested class OutputOnlyTestCase(ValidatedTestCase): __slots__ = () def cleanup(case): pass class BestOutputTestCase(ValidatedTestCase): __slots__ = () # This is the only test case type executing two programs simultaneously class ReactiveTestCase(TestCase): __slots__ = () # The basic idea is to launch the program to be tested and the grader # and to pipe their standard I/O from and to each other, # and then to capture the grader's exit code and use it # like the exit code of an output validator is used. class DummyTestContext(problem.TestGroup): __slots__ = () def end(self): say('Sample total: %d/%d tests' % (self.ncorrect, self.ntotal)) return 0, 0, self.log def load_problem(prob, _types={'batch' : BatchTestCase, 'outonly' : OutputOnlyTestCase, 'bestout' : BestOutputTestCase, 'reactive': ReactiveTestCase}): # We will need to iterate over these configuration variables twice try: len(prob.config.dummies) except Exception: prob.config.dummies = tuple(prob.config.dummies) try: len(prob.config.tests) except Exception: prob.config.tests = tuple(prob.config.tests) if options.legacy: prob.config.usegroups = False newtests = [] for i, name in enumerate(prob.config.tests): # Same here; we'll need to iterate over them twice try: l = len(name) except Exception: try: name = tuple(name) except TypeError: name = (name,) l = len(name) if l > 1: prob.config.usegroups = True newtests.append(name) if prob.config.usegroups: prob.config.tests = newtests del newtests # Even if they have duplicate test identifiers, we must honour sequence pointmaps if isinstance(prob.config.pointmap, dict): def getpoints(i, j, k=None): try: return prob.config.pointmap[i] except KeyError: try: return prob.config.pointmap[None] except KeyError: return prob.config.maxexitcode or 1 elif prob.config.usegroups: def getpoints(i, j, k): try: return prob.config.pointmap[k][j] except LookupError: return prob.config.maxexitcode or 1 else: def getpoints(i, j): try: return prob.config.pointmap[j] except LookupError: return prob.config.maxexitcode or 1 # First get prob.cache.padoutput right, # then yield the actual test cases for i in prob.config.dummies: s = 'sample ' + str(i).zfill(prob.config.paddummies) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) if prob.config.usegroups: if not isinstance(prob.config.groupweight, dict): prob.config.groupweight = dict(enumerate(prob.config.groupweight)) for group in prob.config.tests: for i in group: s = str(i).zfill(prob.config.padtests) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) yield DummyTestContext() for i in prob.config.dummies: s = str(i).zfill(prob.config.paddummies) yield _types[prob.config.kind](prob, s, True, 0) yield problem.test_context_end for k, group in enumerate(prob.config.tests): if not group: continue yield problem.TestGroup(prob.config.groupweight.get(k, prob.config.groupweight.get(None))) for j, i in enumerate(group): s = str(i).zfill(prob.config.padtests) yield _types[prob.config.kind](prob, s, False, getpoints(i, j, k)) yield problem.test_context_end else: for i in prob.config.tests: s = str(i).zfill(prob.config.padtests) prob.cache.padoutput = max(prob.cache.padoutput, len(s)) for i in prob.config.dummies: s = str(i).zfill(prob.config.paddummies) yield _types[prob.config.kind](prob, s, True, 0) for j, i in enumerate(prob.config.tests): s = str(i).zfill(prob.config.padtests) yield _types[prob.config.kind](prob, s, False, getpoints(i, j))