Mercurial > ~astiob > upreckon > hgweb
view upreckon/testcases.py @ 191:4f69e30abbd5
Made sure the wakeup FD is always valid
author | Oleg Oshmyan <chortos@inbox.lv> |
---|---|
date | Thu, 11 Aug 2011 19:33:34 +0300 |
parents | 760d38ee86d6 |
children | fa81289ee407 |
line wrap: on
line source
# Copyright (c) 2010-2011 Chortos-2 <chortos@inbox.lv> # TODO: copy the ansfile if not options.erase even if no validator is used from __future__ import division, with_statement from .compat import * from .exceptions import * from . import files, config from __main__ import options import re, sys, tempfile from subprocess import Popen, PIPE, STDOUT import os devnull = open(os.path.devnull, 'w+b') class DummySignalIgnorer(object): def __enter__(self): pass def __exit__(self, exc_type, exc_value, traceback): pass signal_ignorer = DummySignalIgnorer() try: from .win32 import * except Exception: from .unix import * __all__ = ('TestCase', 'SkippedTestCase', 'ValidatedTestCase', 'BatchTestCase', 'OutputOnlyTestCase') # Helper context managers class CopyDeleting(object): __slots__ = 'case', 'file', 'name' def __init__(self, case, file, name): self.case = case self.file = file self.name = name def __enter__(self): if self.name: try: self.file.copy(self.name) except: try: self.__exit__(None, None, None) except: pass raise def __exit__(self, exc_type, exc_val, exc_tb): if self.name: self.case.files_to_delete.append(self.name) class Copying(object): __slots__ = 'file', 'name' def __init__(self, file, name): self.file = file self.name = name def __enter__(self): if self.name: self.file.copy(self.name) def __exit__(self, exc_type, exc_val, exc_tb): pass # Test case types class TestCase(object): __slots__ = ('problem', 'id', 'isdummy', 'infile', 'outfile', 'points', 'process', 'time_started', 'time_stopped', 'realinname', 'realoutname', 'maxcputime', 'maxwalltime', 'maxmemory', 'has_called_back', 'files_to_delete', 'cpu_time_limit_string', 'wall_time_limit_string', 'time_limit_string') has_ansfile = has_iofiles = False needs_realinname = True if ABCMeta: __metaclass__ = ABCMeta def __init__(case, prob, id, isdummy, points): case.problem = prob case.id = id case.isdummy = isdummy case.points = points case.maxcputime = case.problem.config.maxcputime case.maxwalltime = case.problem.config.maxwalltime case.maxmemory = case.problem.config.maxmemory if case.maxcputime: case.cpu_time_limit_string = '/%.3f' % case.maxcputime else: case.cpu_time_limit_string = '' if case.maxwalltime: case.wall_time_limit_string = '/%.3f' % case.maxwalltime else: case.wall_time_limit_string = '' if not isdummy: if case.needs_realinname: case.realinname = case.problem.config.testcaseinname case.realoutname = case.problem.config.testcaseoutname else: if case.needs_realinname: case.realinname = case.problem.config.dummyinname case.realoutname = case.problem.config.dummyoutname @abstractmethod def test(case): raise NotImplementedError def __call__(case, callback): case.has_called_back = False case.files_to_delete = [] case.time_limit_string = case.wall_time_limit_string try: return case.test(callback) finally: now = clock() if getattr(case, 'time_started', None) is None: case.time_started = case.time_stopped = now elif getattr(case, 'time_stopped', None) is None: case.time_stopped = now if not case.has_called_back: callback() case.cleanup() def cleanup(case): # Note that native extensions clean up on their own # and never let this condition be satisfied if getattr(case, 'process', None) and case.process.returncode is None: kill(case.process) for name in case.files_to_delete: try: os.remove(name) except OSError: # It can't be helped pass def open_infile(case): try: case.infile = files.File('/'.join((case.problem.name, case.realinname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadInputFile(e) def open_outfile(case): try: case.outfile = files.File('/'.join((case.problem.name, case.realoutname.replace('$', case.id)))) except IOError: e = sys.exc_info()[1] raise CannotReadAnswerFile(e) class SkippedTestCase(TestCase): __slots__ = () def test(case, callback): raise TestCaseSkipped class ValidatedTestCase(TestCase): __slots__ = 'validator' def __init__(case, *args): TestCase.__init__(case, *args) if not case.problem.config.tester: case.validator = None else: case.validator = case.problem.config.tester def validate(case, output): if not case.validator: # Compare the output with the reference output buffer = refbuffer = crlfhalf = refcrlfhalf = ''.encode() crlf = '\r\n'.encode('ascii') case.open_outfile() with case.outfile.open() as refoutput: while True: data = output.read(4096 - len(buffer)) refdata = refoutput.read(4096 - len(refbuffer)) if not case.problem.config.binary: data, refdata = crlfhalf + data, refcrlfhalf + refdata size, refsize = len(data), len(refdata) if data and data != crlfhalf and data[-1] == crlf[0]: size -= 1 crlfhalf = data[-1:] else: crlfhalf = ''.encode() if refdata and refdata != refcrlfhalf and refdata[-1] == crlf[0]: refsize -= 1 refcrlfhalf = refdata[-1:] else: refcrlfhalf = ''.encode() data = data[:size].replace(crlf, crlf[1:]) data = data.replace(crlf[:1], crlf[1:]) refdata = refdata[:refsize].replace(crlf, crlf[1:]) refdata = refdata.replace(crlf[:1], crlf[1:]) buffer += data refbuffer += refdata if not (buffer or refbuffer or crlfhalf or refcrlfhalf): break elif not buffer and not crlfhalf or not refbuffer and not refcrlfhalf: raise WrongAnswer size = min(len(buffer), len(refbuffer)) if buffer[:size] != refbuffer[:size]: raise WrongAnswer buffer, refbuffer = buffer[size:], refbuffer[size:] return 1 elif callable(case.validator): return case.validator(output) else: # Call the validator program output.close() if case.problem.config.ansname: case.open_outfile() case.outfile.copy(case.problem.config.ansname) try: case.process = Popen(case.validator, stdin=devnull, stdout=PIPE, stderr=STDOUT, universal_newlines=True, bufsize=-1) except OSError: raise CannotStartValidator(sys.exc_info()[1]) with signal_ignorer: comment = case.process.communicate()[0].strip() match = re.match(r'(?i)(ok|(?:correct|wrong)(?:(?:\s|_)*answer)?)(?:$|\s+|[.,!:]+\s*)', comment) if match: comment = comment[match.end():] if not case.problem.config.maxexitcode: if case.process.returncode: raise WrongAnswer(comment) else: return 1, comment else: return case.process.returncode / case.problem.config.maxexitcode, comment class BatchTestCase(ValidatedTestCase): __slots__ = () @property def has_iofiles(case): return (not case.problem.config.stdio or case.validator and not callable(case.validator)) @property def has_ansfile(case): return case.validator and not callable(case.validator) def test(case, callback): case.open_infile() if case.problem.config.stdio: if options.erase and not case.validator or not case.problem.config.inname: # TODO: re-use the same file name if possible # FIXME: 2.5 lacks the delete parameter with tempfile.NamedTemporaryFile(delete=False) as f: inputdatafname = f.name contextmgr = CopyDeleting(case, case.infile, inputdatafname) else: inputdatafname = case.problem.config.inname contextmgr = Copying(case.infile, inputdatafname) with contextmgr: with tempfile.TemporaryFile('w+b') if options.erase and (not case.validator or callable(case.validator)) else open(case.problem.config.outname, 'w+b') as outfile: with open(inputdatafname) as infile: call(case.problem.config.path, case=case, stdin=infile, stdout=outfile, stderr=devnull) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) case.has_called_back = True callback() outfile.seek(0) return case.validate(outfile) else: case.infile.copy(case.problem.config.inname) call(case.problem.config.path, case=case, stdin=devnull, stdout=devnull, stderr=devnull) if config.globalconf.force_zero_exitcode and case.process.returncode or case.process.returncode < 0: raise NonZeroExitCode(case.process.returncode) case.has_called_back = True callback() try: output = open(case.problem.config.outname, 'rb') except IOError: raise CannotReadOutputFile(sys.exc_info()[1]) with output as output: return case.validate(output) # This is the only test case type not executing any programs to be tested class OutputOnlyTestCase(ValidatedTestCase): __slots__ = () needs_realinname = False def cleanup(case): pass def test(case, callback): case.time_stopped = case.time_started = 0 case.has_called_back = True callback() try: output = open(case.problem.config.outname.replace('$', case.id), 'rb') except IOError: raise CannotReadOutputFile(sys.exc_info()[1]) with output as output: return case.validate(output) class BestOutputTestCase(ValidatedTestCase): __slots__ = () # This is the only test case type executing two programs simultaneously class ReactiveTestCase(TestCase): __slots__ = () # The basic idea is to launch the program to be tested and the grader # and to pipe their standard I/O from and to each other, # and then to capture the grader's exit code and use it # like the exit code of an output validator is used.