4
0

+ Intro to C test suite.

This commit is contained in:
SAVELIY BAKTURIN
2026-02-10 13:40:02 +00:00
commit ca1c008cce
8 changed files with 802 additions and 0 deletions

28
testsuites/__init__.py Normal file
View File

@@ -0,0 +1,28 @@
from .suite import *
# Categories prefix environment name.
PREFIX_ENVIRONMENT_NAME = "AI360_CPP"
# The operation completed successfully.
ERROR_SUCCESS = 0
# File can't be opened.
ERROR_CANNOT_OPEN_FILE = 1
# Not enough memory, memory allocation failed.
ERROR_NOT_ENOUGH_MEMORY = 2
# The data is invalid.
ERROR_DATA_INVALID = 3
# The cmd line's args or number of parameters (argv) is incorrect.
ERROR_ARGUMENTS_INVALID = 4
# Incorrect file format.
ERROR_FORMAT_INVALID = 5
# Unsupported functionality.
ERROR_UNSUPPORTED = 20
# Other errors.
ERROR_UNKNOWN = 250

68
testsuites/intro.py Normal file
View File

@@ -0,0 +1,68 @@
from testsuites import *
from typing import List
class __Intro(Testsuite):
__SUITE_NAME = "intro"
__TIMEOUT = 1
__CATEGORIES_TO_ENVNAMES = { "a + b": "A_PLUS_B" }
def __init__(self):
super().__init__(self.__SUITE_NAME, PREFIX_ENVIRONMENT_NAME, self.__CATEGORIES_TO_ENVNAMES)
def get_tester(self) -> Tester:
tester = Tester(self.name())
class __Expected(Expected):
def __init__(self, a: int, b: int):
super().__init__()
self.__a = a
self.__b = b
def test(self, run: Run, runned: Runned) -> Verdict:
try:
output = runned.get_stdout()
if not output.endswith("\n"):
return Verdict(VerdictErrno.ERROR_INVALID_FORMAT, f"newline at stdout's end expected")
lines = output.splitlines()
if len(lines) != 1:
return Verdict(VerdictErrno.ERROR_INVALID_FORMAT, f"single line expected")
line = lines[0]
if line != line.lstrip() or line != line.rstrip():
return Verdict(VerdictErrno.ERROR_INVALID_FORMAT, f"found unexpected space characters in stdout")
actual = int(line)
expected = self.__a + self.__b
if actual != expected:
return Verdict(VerdictErrno.ERROR_ASSERTION, f"{self.__a} + {self.__b} = {expected}, (actual: {actual})", "check math", True)
return ok()
except Exception as _:
return Verdict(VerdictErrno.ERROR_TYPE_ERROR, f"can't convert \"{escape(runned.get_stdout())}\" to integer")
def __single_test(a: int, b: int) -> SingleTest:
run = Run(c_timeout = self.__TIMEOUT, c_stdin = f"{a} {b}", c_args = None, t_returncode_policy = ReturnCodePolicy.ShouldBeZero)
expected = __Expected(a, b)
return (run, expected)
def __sequence(a: int, b: int) -> List[SingleTest]:
return [__single_test(a, b)]
def __test(a: int, b: int) -> Test:
name = f"{a} + {b}"
test = Test(name = name, categories = ["a + b"], sequence = __sequence(a, b))
return test
def t(a: int, b: int):
tester.add(__test(a, b))
for a in range(-5, 10):
for b in range(6, 9):
t(a, b)
return tester
instance = __Intro()

538
testsuites/suite.py Normal file
View File

@@ -0,0 +1,538 @@
import os
import subprocess
import time
import json
import enum
import signal
from typing import List, Optional, Tuple, Union, Callable, TypeVar, Any, Dict, Set, Iterable
from abc import abstractmethod, ABC
T = TypeVar('T')
def escape(s: str) -> str:
escaped = ""
for c in s:
if c == "\n":
escaped += "\\n"
elif c == "\r":
escaped += "\\r"
elif c == "\t":
escaped += "\\t"
else:
escaped += c
return escaped
class Log:
def __init__(self, indent_factor: int, indent_n: Optional[int] = None, indent_char: str = " "):
self.__indent_factor = indent_factor
self.__indent_n = indent_factor if indent_n is None else indent_n
self.__indent_char = indent_char
def scope(self, name: str, action: Callable[[], T]) -> T:
self.println(name)
self.__indent_n += self.__indent_factor
try:
return action()
finally:
self.__indent_n -= self.__indent_factor
def println(self, line: str):
print(f"{self.__indent()}{line}")
def __indent(self) -> str:
return self.__indent_char * self.__indent_n
class VerdictErrno(enum.Enum):
ERROR_SUCCESS = "success"
ERROR_RETURNCODE = "program returns wrong returncode"
ERROR_ASSERTION = "assertion"
ERROR_TIMEOUT = "timeout expired"
ERROR_STDERR_EMPTY = "standard error output is empty"
ERROR_STDERR_IS_NOT_EMPTY = "standard error output is not empty"
ERROR_TYPE_ERROR = "type error"
ERROR_INVALID_FORMAT = "invalid format"
ERROR_VALGRIND_MEMCHECK = "valgrind error"
ERROR_GDB_ERROR = "GDB error"
class Verdict:
def __init__(self, verdict_errno: VerdictErrno, what: Optional[str] = None, extended_what: Union[List[str], str] = [], extended_what_is_hint: bool = False):
self.__verdict_errno = verdict_errno
self.__what = what
self.__extended_what = extended_what.splitlines() if isinstance(extended_what, str) else extended_what
self.__extended_what_is_hint = extended_what_is_hint
def errno(self) -> VerdictErrno:
return self.__verdict_errno
def is_success(self) -> bool:
return self.__verdict_errno == VerdictErrno.ERROR_SUCCESS
def is_failed(self) -> bool:
return not self.is_success()
def verdict_message(self) -> str:
return self.__verdict_errno.value
def extended_what(self) -> List[str]:
return self.__extended_what
def extended_what_is_hint(self) -> bool:
return self.__extended_what_is_hint
def what(self) -> str:
if self.__what is None:
return "no additional information"
return self.__what
def ok() -> Verdict:
return Verdict(VerdictErrno.ERROR_SUCCESS)
class DynamicWrapper(enum.Enum):
NO_WRAPPER = "no"
VALGRIND_ANALYZER = "valgrind"
GDB_DEBUGGER = "gdb"
class Runned:
def __init__(self, c_returncode: int, c_stdout: str, c_stderr: str, c_start: int, c_end: int, dynamic_analyzer: DynamicWrapper):
self.__c_returncode = c_returncode
self.__c_stdout = c_stdout
self.__c_stderr = c_stderr
self.__c_start = c_start
self.__c_end = c_end
self.__dynamic_analyzer = dynamic_analyzer
def get_returncode(self) -> int:
return self.__c_returncode
def get_stdout(self) -> str:
return self.__c_stdout
def get_stderr(self) -> str:
return self.__c_stderr
def start(self) -> int:
return self.__c_start
def end(self) -> int:
return self.__c_end
def dynamic_analyzer(self) -> DynamicWrapper:
return self.__dynamic_analyzer
def now() -> int:
return time.time_ns() // 1_000_000
class ReturnCodePolicy(enum.Enum):
MatchIfPresented = 0
ShouldBeZero = 1
ShouldNotBeZero = 2
class Run:
VALGRIND_LOG_FILENAME = "valgrind.log"
VALGRIND_ERROR_MARKER = "VALGRINDERRORMARKERFORPRETESTING"
GDB_LOG_FILENAME = "gdb.log"
GDB_NO_ERROR_MARKER = "exited normally"
def __init__(self, c_timeout: Union[float, int], c_stdin: Optional[str], c_args: Optional[List[str]], t_returncode_policy: ReturnCodePolicy, t_returncode: Optional[int] = None, t_stdout: Optional[str] = None, t_stderr_empty: bool = True):
self.__c_timeout = float(c_timeout)
self.__c_stdin = c_stdin
self.__c_args = c_args
self.__t_returncode_policy = t_returncode_policy
self.__t_returncode = t_returncode
self.__t_stdout = t_stdout
self.__t_stderr_empty = t_stderr_empty
def get_timeout(self) -> float:
return self.__c_timeout
def stdin_presented(self) -> bool:
return self.__c_stdin is not None
def get_stdin(self) -> str:
assert self.__c_stdin is not None
return self.__c_stdin
def args_presented(self) -> bool:
return self.__c_args is not None
def get_args(self) -> List[str]:
assert self.__c_args is not None
return self.__c_args
def expected_returncode_policy(self) -> ReturnCodePolicy:
return self.__t_returncode_policy
def expected_returncode_presented(self) -> bool:
return self.__t_returncode is not None
def get_expected_returncode(self) -> int:
assert self.__t_returncode is not None
return self.__t_returncode
def expected_stdout_presented(self) -> bool:
return self.__t_stdout is not None
def get_expected_stdout(self) -> str:
assert self.__t_stdout is not None
return self.__t_stdout
def is_stderr_should_be_empty(self) -> bool:
return self.__t_stderr_empty
def run(self, executable_path: str, timeout_factor: float, dynamic_wrapper: DynamicWrapper) -> Optional[Runned]:
cmd: List[str] = []
if dynamic_wrapper == DynamicWrapper.VALGRIND_ANALYZER:
cmd.append("valgrind")
cmd.append("--tool=memcheck")
cmd.append("--leak-check=full")
cmd.append("--show-leak-kinds=all")
cmd.append("--track-origins=yes")
cmd.append("--vgdb=no")
cmd.append(f"--log-file={Run.VALGRIND_LOG_FILENAME}")
cmd.append(f"--error-markers={Run.VALGRIND_ERROR_MARKER},{Run.VALGRIND_ERROR_MARKER}")
elif dynamic_wrapper == DynamicWrapper.GDB_DEBUGGER:
cmd.append("gdb")
cmd.append("-q")
cmd.append("-return-child-result")
cmd.append("--batch-silent")
cmd.append("--eval-command=\"set debuginfod enabled off\"")
cmd.append(f"--eval-command=\"set logging file {Run.GDB_LOG_FILENAME}\"")
cmd.append("--eval-command=\"set logging redirect on\"")
cmd.append("--eval-command=\"set logging overwrite on\"")
cmd.append("--eval-command=\"set logging debugredirect on\"")
cmd.append("--eval-command=\"set logging enabled on\"")
cmd.append("--eval-command=\"set print frame-arguments all\"")
cmd.append("--eval-command=\"run\"")
cmd.append("--eval-command=\"thread apply all bt -frame-info source-and-location -full\"")
cmd.append("--args")
cmd.append(os.path.abspath(executable_path))
if self.args_presented():
cmd += self.get_args()
if dynamic_wrapper == DynamicWrapper.GDB_DEBUGGER:
child = subprocess.Popen(" ".join(cmd), stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = True, shell = True, preexec_fn = os.setsid)
else:
child = subprocess.Popen(cmd, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = True)
try:
start = now()
stdout, stderr = child.communicate(input = self.__c_stdin, timeout = self.__c_timeout * timeout_factor)
end = now()
return Runned(child.returncode, stdout, stderr, start, end, dynamic_wrapper)
except subprocess.TimeoutExpired:
if dynamic_wrapper == DynamicWrapper.GDB_DEBUGGER:
os.killpg(os.getpgid(child.pid), signal.SIGKILL)
else:
child.kill()
return None
class Expected(ABC):
def __init__(self):
pass
@abstractmethod
def test(self, run: Run, runned: Runned) -> Verdict:
raise NotImplementedError("Expected::test is not implemented")
SingleTest = Tuple[Run, Optional[Expected]]
class Test:
def __init__(self, name: str, categories: Iterable[str] = [], sequence: List[SingleTest] = []):
self.__name = name
self.__categories = set(categories)
self.__sequence = sequence
def runs(self) -> List[SingleTest]:
return self.__sequence
def name(self) -> str:
return self.__name
def add(self, run: Run, expected: Optional[Expected] = None):
self.add_single_test((run, expected))
def add_single_test(self, single_test: SingleTest):
self.__sequence.append(single_test)
def categories(self) -> Set[str]:
return self.__categories
def __pretest(self, run: Run, runned: Runned) -> Verdict:
stderr_should_be_empty = run.is_stderr_should_be_empty()
c_stderr = runned.get_stderr()
is_empty_stderr = c_stderr == ""
if runned.dynamic_analyzer() == DynamicWrapper.VALGRIND_ANALYZER:
extended_what: List[str] = []
lines = open(Run.VALGRIND_LOG_FILENAME, "r").readlines()
start = False
for line in lines:
error_marker_appears = Run.VALGRIND_ERROR_MARKER in line
if error_marker_appears:
start = not start
continue
if start:
extended_what.append(line.rstrip())
if len(extended_what) != 0:
return Verdict(VerdictErrno.ERROR_VALGRIND_MEMCHECK, f"below is what was in the valgrind log", extended_what)
elif runned.dynamic_analyzer() == DynamicWrapper.GDB_DEBUGGER:
extended_what: List[str] = []
lines = open(Run.GDB_LOG_FILENAME, "r").readlines()
no_error_marker_appears = any(Run.GDB_NO_ERROR_MARKER in line for line in lines)
if not no_error_marker_appears:
for line in lines:
extended_what.append(line.rstrip())
return Verdict(VerdictErrno.ERROR_GDB_ERROR, f"below is what was in the GDB log", extended_what)
if stderr_should_be_empty and not is_empty_stderr:
return Verdict(VerdictErrno.ERROR_STDERR_IS_NOT_EMPTY, f"below is what was in the stderr", c_stderr)
if not stderr_should_be_empty and is_empty_stderr:
return Verdict(VerdictErrno.ERROR_STDERR_EMPTY)
policy = run.expected_returncode_policy()
actual_returncode = runned.get_returncode()
if policy == ReturnCodePolicy.ShouldBeZero:
if actual_returncode != 0:
return Verdict(VerdictErrno.ERROR_RETURNCODE, f"expected {0}, but actual is {actual_returncode}")
elif policy == ReturnCodePolicy.ShouldNotBeZero:
if actual_returncode == 0:
return Verdict(VerdictErrno.ERROR_RETURNCODE, f"expected non-zero returncode, but actual is {actual_returncode}")
elif policy == ReturnCodePolicy.MatchIfPresented and run.expected_returncode_presented():
expected_returncode = run.get_expected_returncode()
if actual_returncode != expected_returncode:
return Verdict(VerdictErrno.ERROR_RETURNCODE, f"expected {expected_returncode}, but actual is {actual_returncode}")
return ok()
def __base_message(self, i: int) -> str:
return f"Run #{i + 1}/{len(self.__sequence)}..."
def __print_verdict(self, i: int, verdict: Verdict, log: Log):
log.println(f"{self.__base_message(i)} FAILED.")
log.println(f"{verdict.verdict_message().capitalize()}: {verdict.what()}.")
lines = verdict.extended_what()
if verdict.extended_what_is_hint():
assert len(lines) == 1
log.println(f"Hint: {lines[0]}.")
elif len(lines) >= 1:
for line in lines:
log.println(line)
def __invoke(self, executable_path: str, timeout_factor: float, dynamic_wrapper: DynamicWrapper, log: Log) -> Verdict:
for i, (run, expected) in enumerate(self.__sequence):
log.println(self.__base_message(i))
runned = run.run(executable_path, timeout_factor, dynamic_wrapper)
if runned is None:
verdict = Verdict(VerdictErrno.ERROR_TIMEOUT, f"executed in more than {run.get_timeout() * timeout_factor}s")
self.__print_verdict(i, verdict, log)
return verdict
verdict = self.__pretest(run, runned)
if verdict.is_failed():
self.__print_verdict(i, verdict, log)
return verdict
if expected is not None:
verdict = expected.test(run, runned)
if verdict.is_failed():
self.__print_verdict(i, verdict, log)
return verdict
log.println(f"{self.__base_message(i)} ok: passed in {runned.end() - runned.start()}ms.")
return ok()
def invoke(self, executable_path: str, timeout_factor: float, dynamic_wrapper: DynamicWrapper, log: Log) -> Verdict:
return log.scope(f"Running sequence of {len(self.__sequence)} runs:", lambda : self.__invoke(executable_path, timeout_factor, dynamic_wrapper, log))
def warm(self, executable_path: str, timeout_factor: float):
for run, _ in self.__sequence:
_ = run.run(executable_path, timeout_factor, DynamicWrapper.NO_WRAPPER)
class Result:
def __init__(self, tests: List[Test]):
self.__tests = tests
self.__verdicts: List[Verdict] = []
self.__passed = 0
def add(self, verdict: Verdict):
if verdict.is_success():
self.__passed += 1
self.__verdicts.append(verdict)
def n(self) -> int:
return len(self.__verdicts)
def passed(self) -> int:
return self.__passed
def exitcode(self) -> int:
return 0 if self.n() == self.passed() else 1
def __get_passed_by_category(self, category: str) -> int:
passed = 0
for test, verdict in zip(self.__tests, self.__verdicts):
if verdict.is_success() and category in test.categories():
passed += 1
return passed
def __get_total_by_category(self, category: str) -> int:
total = 0
for test, verdict in zip(self.__tests, self.__verdicts):
if category in test.categories():
total += 1
return total
def __get_result_by_category(self, category: str) -> float:
total = self.__get_total_by_category(category)
passed = self.__get_passed_by_category(category)
return passed / total
def __calculate_total(self, coefficients: Dict[str, float]) -> float:
total = 0.0
for category, coefficient in coefficients.items():
result = self.__get_result_by_category(category)
total += result * coefficient
return total
def __get_results_by_categories(self, categories: Iterable[str]) -> Dict[str, float]:
results: Dict[str, float] = {}
for category in categories:
results[category] = self.__get_result_by_category(category)
return results
def __get_results(self) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for i, (test, verdict) in enumerate(zip(self.__tests, self.__verdicts)):
result: Dict[str, Any] = {}
result["id"] = i
result["name"] = test.name()
result["categories"] = list(test.categories())
result["passed"] = verdict.is_success()
result["verdict"] = verdict.verdict_message()
result["what"] = verdict.what() if verdict.is_failed() else ""
runs: List[Dict[str, Any]] = []
for r, _ in test.runs():
run: Dict[str, Any] = {}
run["timeout"] = r.get_timeout()
run["stdin"] = escape(r.get_stdin()) if r.stdin_presented() else ""
run["args"] = [escape(arg) for arg in r.get_args()] if r.args_presented() else ""
run["expected_returncode"] = r.get_expected_returncode() if r.expected_returncode_presented() else ""
run["expected_stdout"] = r.get_expected_stdout() if r.expected_stdout_presented() else ""
run["stderr_should_be_empty"] = r.is_stderr_should_be_empty()
runs.append(run)
result["runs"] = runs
results.append(result)
return results
def export_report(self, output_path: str, coefficients: Dict[str, float]):
categories = coefficients.keys()
j: Dict[str, Any] = {}
j["result"] = self.__calculate_total(coefficients)
j["categories"] = self.__get_results_by_categories(categories)
j["tests"] = self.__get_results()
o = json.dumps(j, indent = 4)
with open(output_path, "w") as file:
file.write(f"{o}\n")
class Tester:
def __init__(self, testsuite_name: str):
self.__tests: List[Test] = []
self.__testsuite_name = testsuite_name
self.__log = Log(indent_factor = 4)
def add(self, test: Test):
self.__tests.append(test)
def warm(self, executable_path: str, timeout_factor: float):
print(f"=== Warming `{self.__testsuite_name}`...")
for test in self.__tests:
test.warm(executable_path, timeout_factor)
def run(self, executable_path: str, timeout_factor: float, dynamic_wrapper: DynamicWrapper) -> Result:
print(f"=== Testing `{self.__testsuite_name}`...")
result = Result(self.__tests)
start = now()
for test in self.__tests:
verdict = self.__log.scope(f"Test '{test.name()}' starts...", lambda : test.invoke(executable_path, timeout_factor, dynamic_wrapper, self.__log))
result.add(verdict)
end = now()
print(f"{"=" * 30}")
print(f"{result.passed()}/{result.n()} tests passed in {end - start}ms")
return result
class Testsuite(ABC):
def __init__(self, name: str, prefenvname: str, catenvnames: Dict[str, str]):
self.__name = name
self.__prefenvname = prefenvname
self.__catenvnames = catenvnames
def name(self) -> str:
return self.__name
def get_coefficients(self) -> Dict[str, float]:
coefficients: Dict[str, float] = {}
categories = list(self.__catenvnames.keys())
for category in categories:
key = f"{self.__prefenvname.upper()}_{self.__name.upper()}_{self.__catenvnames[category]}"
value = os.getenv(key)
if value is None:
value = 0.0
else:
try:
value = float(value)
except TypeError as _:
value = 0.0
coefficients[category] = value
return coefficients
@abstractmethod
def get_tester(self) -> Tester:
raise NotImplementedError("Testsuite::get_tester is not implemented")