From ca1c008ccee119a95de97371f1ee08034de7945e Mon Sep 17 00:00:00 2001 From: SAVELIY BAKTURIN <96750568+zavierwaffle@users.noreply.github.com> Date: Tue, 10 Feb 2026 13:40:02 +0000 Subject: [PATCH] + Intro to C test suite. --- .gitattributes | 2 + .gitignore | 11 + README.md | 93 +++++++ main.py | 62 +++++ requirements.txt | 0 testsuites/__init__.py | 28 +++ testsuites/intro.py | 68 ++++++ testsuites/suite.py | 538 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 802 insertions(+) create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 README.md create mode 100755 main.py create mode 100644 requirements.txt create mode 100644 testsuites/__init__.py create mode 100644 testsuites/intro.py create mode 100644 testsuites/suite.py diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..69ea885 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +# Detect text files and perform LF normalization. +* text=auto eol=lf diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cfa7910 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +# Temporary files. +*.c +*.exe +*.json +*.out +*.log +*.sh + +# Python cache. +__pycache__/ +*.pyc diff --git a/README.md b/README.md new file mode 100644 index 0000000..af4f4a2 --- /dev/null +++ b/README.md @@ -0,0 +1,93 @@ +# Тесты к курсу «Язык программирования C++» + +Условия лабораторных работ вы можете найти в таблице курса. + +Порядок запуска локального тестирования: + +1. Склонировать этот репозиторий + + ```bash + git clone https://gitea.mkorn.me/ai360-cpp/public-tests-2026.git + ``` + +2. Инициализировать [виртуальную среду Python](#виртуальная-среда-python) +3. Скомпилировать разработанную программу +4. Выполнить команду: + + ```bash + python main.py --executable-path <путь к исполняемому файлу> --suite <набор тестов> + ``` + + *Примечание*. В *nix-подобных системах команда `python` может быть недоступна, вместо неё используйте команду `python3`. + +5. Для ускорения отладки рекомендуется создать скрипт, выполняющий шаги 3-4. +6. Для обновления тестов выполнить команду: + + ```bash + git pull + ``` + +## Лабораторные работы + +Ниже представлена таблица доступных для тестирования (как локально, так и автоматически в репозиториях) лабораторных работ с названием набора тестов для тестера (`suite`). + +| № | Лабораторная работа | Набор тестов | +|:--:|:--------------------|:-------------| +| 0 | Интро | `intro` | + +## Скрипт запуска + +> [!NOTE] +> Вы можете запустить команду `python main.py --help` для получения справочной информации. + +Тестер выполняет тестирование программы методом "чёрного ящика", то есть на вход *input* программа выдаёт какой-то выход *output*, который должен быть правильным с точки зрения текущего теста. Необходимыми и достаточными параметрами [`main.py`](main.py) являются: + +* `--executable-path ` - путь к исполняемому файлу; +* `--suite ` - выбор задания (набор тестов). + +Поскольку отсутствие оптимизации при сборке и санитайзеры могут замедлить вашу программу, то имеет смысл установить множитель максимального времени исполнения процесса на все тесты: + +* `--timeout-factor ` - множитель максимального времени исполнения порождённого процесса программы (по умолчанию: `1.0`). + +Для генерации полного отчёта (общая информация и информация по каждому тесту) в формате JSON: + +* `--report-output-path ` - генерация JSON отчёта с заданным названием файла (по умолчанию: нет генерации). + +Имеется возможность проверить программу с поддержанными динамическими анализаторами и дебаггерами: + +* `--dynamic-wrapper ` - запуск приложения в обёртке динамического анализатора или дебаггера (по умолчанию: нет обёртки). + + *Примечание*. Требуемая операционная система: **Linux**. Обёртки: дебаггер [`gdb`](https://www.sourceware.org/gdb/), динамический анализатор [`valgrind`](https://valgrind.org/). + +## Виртуальная среда Python + +Для тестирования рекомендуется создать [виртуальную среду `venv`](https://docs.python.org/3/library/venv.html) и тестироваться через неё. Таким образом, можно поднять уровень изоляции от всей системы и избежать установки конфликтующих библиотек: + +1. создание новой среды: + + ```bash + python -m venv venv + ``` + +2. активация текущей командной строки как `venv`: + + ```bash + source ./venv/bin/activate + # на Linux/MacOS (Bash) + ``` + + ```powershell + .\venv\Scripts\Activate.ps1 + # на Windows (Powershell) + ``` + + ```bat + .\venv\Scripts\activate.bat + REM на Windows (CMD) + ``` + +3. установка всех необходимых для тестирования библиотек: + + ```bash + pip install -r requirements.txt + ``` diff --git a/main.py b/main.py new file mode 100755 index 0000000..d5beca2 --- /dev/null +++ b/main.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 + +import argparse +import platform + +import testsuites.intro as intro + +from testsuites import Testsuite, DynamicWrapper +from typing import Dict, Any, List + +__TESTSUITES: List[Testsuite] = [ + intro.instance +] + +__SUITENAMES: List[str] = [t.name() for t in __TESTSUITES] + +__SELECTOR: Dict[str, Testsuite] = dict(zip(__SUITENAMES, __TESTSUITES)) + +def __args() -> Dict[str, Any]: + parser = argparse.ArgumentParser() + + parser.add_argument("--executable-path", help = "path to the executable file to be tested", type = str, required = True) + parser.add_argument("--suite", help = "test set selection", type = str, required = True, choices = __SELECTOR) + parser.add_argument("--timeout-factor", help = "maximum program execution time multiplier (default: 1.0)", type = float, default = 1.0) + + available_dynamic_analyzers: List[DynamicWrapper] = [DynamicWrapper.NO_WRAPPER] + + if platform.system() == "Linux": + available_dynamic_analyzers.append(DynamicWrapper.VALGRIND_ANALYZER) + available_dynamic_analyzers.append(DynamicWrapper.GDB_DEBUGGER) + + parser.add_argument("--dynamic-wrapper", help = "", type = str, default = DynamicWrapper.NO_WRAPPER.value, choices = [ada.value for ada in available_dynamic_analyzers]) + + parser.add_argument("--report-output-path", help = "path to the generated JSON report with the specified file name (default: no generation)", type = str, default = None) + + return vars(parser.parse_args()) + +def __main(): + args = __args() + + executable_path = str(args["executable_path"]) + suite = str(args["suite"]) + timeout_factor = float(args["timeout_factor"]) + + dynamic_wrapper = DynamicWrapper(args["dynamic_wrapper"]) + + report_output_path = args["report_output_path"] + + testsuite = __SELECTOR[suite] + tester = testsuite.get_tester() + + tester.warm(executable_path, timeout_factor) + result = tester.run(executable_path, timeout_factor, dynamic_wrapper) + + if report_output_path is not None: + coefficients = testsuite.get_coefficients() + result.export_report(str(report_output_path), coefficients) + + exit(result.exitcode()) + +if __name__ == "__main__": + __main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/testsuites/__init__.py b/testsuites/__init__.py new file mode 100644 index 0000000..744497e --- /dev/null +++ b/testsuites/__init__.py @@ -0,0 +1,28 @@ +from .suite import * + +# Categories prefix environment name. +PREFIX_ENVIRONMENT_NAME = "AI360_CPP" + +# The operation completed successfully. +ERROR_SUCCESS = 0 + +# File can't be opened. +ERROR_CANNOT_OPEN_FILE = 1 + +# Not enough memory, memory allocation failed. +ERROR_NOT_ENOUGH_MEMORY = 2 + +# The data is invalid. +ERROR_DATA_INVALID = 3 + +# The cmd line's args or number of parameters (argv) is incorrect. +ERROR_ARGUMENTS_INVALID = 4 + +# Incorrect file format. +ERROR_FORMAT_INVALID = 5 + +# Unsupported functionality. +ERROR_UNSUPPORTED = 20 + +# Other errors. +ERROR_UNKNOWN = 250 diff --git a/testsuites/intro.py b/testsuites/intro.py new file mode 100644 index 0000000..e3c9f0a --- /dev/null +++ b/testsuites/intro.py @@ -0,0 +1,68 @@ +from testsuites import * +from typing import List + +class __Intro(Testsuite): + __SUITE_NAME = "intro" + __TIMEOUT = 1 + __CATEGORIES_TO_ENVNAMES = { "a + b": "A_PLUS_B" } + + def __init__(self): + super().__init__(self.__SUITE_NAME, PREFIX_ENVIRONMENT_NAME, self.__CATEGORIES_TO_ENVNAMES) + + def get_tester(self) -> Tester: + tester = Tester(self.name()) + + class __Expected(Expected): + def __init__(self, a: int, b: int): + super().__init__() + + self.__a = a + self.__b = b + + def test(self, run: Run, runned: Runned) -> Verdict: + try: + output = runned.get_stdout() + if not output.endswith("\n"): + return Verdict(VerdictErrno.ERROR_INVALID_FORMAT, f"newline at stdout's end expected") + + lines = output.splitlines() + if len(lines) != 1: + return Verdict(VerdictErrno.ERROR_INVALID_FORMAT, f"single line expected") + + line = lines[0] + if line != line.lstrip() or line != line.rstrip(): + return Verdict(VerdictErrno.ERROR_INVALID_FORMAT, f"found unexpected space characters in stdout") + + actual = int(line) + expected = self.__a + self.__b + + if actual != expected: + return Verdict(VerdictErrno.ERROR_ASSERTION, f"{self.__a} + {self.__b} = {expected}, (actual: {actual})", "check math", True) + + return ok() + except Exception as _: + return Verdict(VerdictErrno.ERROR_TYPE_ERROR, f"can't convert \"{escape(runned.get_stdout())}\" to integer") + + def __single_test(a: int, b: int) -> SingleTest: + run = Run(c_timeout = self.__TIMEOUT, c_stdin = f"{a} {b}", c_args = None, t_returncode_policy = ReturnCodePolicy.ShouldBeZero) + expected = __Expected(a, b) + return (run, expected) + + def __sequence(a: int, b: int) -> List[SingleTest]: + return [__single_test(a, b)] + + def __test(a: int, b: int) -> Test: + name = f"{a} + {b}" + test = Test(name = name, categories = ["a + b"], sequence = __sequence(a, b)) + return test + + def t(a: int, b: int): + tester.add(__test(a, b)) + + for a in range(-5, 10): + for b in range(6, 9): + t(a, b) + + return tester + +instance = __Intro() diff --git a/testsuites/suite.py b/testsuites/suite.py new file mode 100644 index 0000000..eaae7cc --- /dev/null +++ b/testsuites/suite.py @@ -0,0 +1,538 @@ +import os +import subprocess +import time +import json +import enum +import signal + +from typing import List, Optional, Tuple, Union, Callable, TypeVar, Any, Dict, Set, Iterable +from abc import abstractmethod, ABC + +T = TypeVar('T') + +def escape(s: str) -> str: + escaped = "" + for c in s: + if c == "\n": + escaped += "\\n" + elif c == "\r": + escaped += "\\r" + elif c == "\t": + escaped += "\\t" + else: + escaped += c + return escaped + +class Log: + def __init__(self, indent_factor: int, indent_n: Optional[int] = None, indent_char: str = " "): + self.__indent_factor = indent_factor + self.__indent_n = indent_factor if indent_n is None else indent_n + self.__indent_char = indent_char + + def scope(self, name: str, action: Callable[[], T]) -> T: + self.println(name) + self.__indent_n += self.__indent_factor + try: + return action() + finally: + self.__indent_n -= self.__indent_factor + + def println(self, line: str): + print(f"{self.__indent()}{line}") + + def __indent(self) -> str: + return self.__indent_char * self.__indent_n + +class VerdictErrno(enum.Enum): + ERROR_SUCCESS = "success" + ERROR_RETURNCODE = "program returns wrong returncode" + ERROR_ASSERTION = "assertion" + ERROR_TIMEOUT = "timeout expired" + ERROR_STDERR_EMPTY = "standard error output is empty" + ERROR_STDERR_IS_NOT_EMPTY = "standard error output is not empty" + ERROR_TYPE_ERROR = "type error" + ERROR_INVALID_FORMAT = "invalid format" + ERROR_VALGRIND_MEMCHECK = "valgrind error" + ERROR_GDB_ERROR = "GDB error" + +class Verdict: + def __init__(self, verdict_errno: VerdictErrno, what: Optional[str] = None, extended_what: Union[List[str], str] = [], extended_what_is_hint: bool = False): + self.__verdict_errno = verdict_errno + self.__what = what + self.__extended_what = extended_what.splitlines() if isinstance(extended_what, str) else extended_what + self.__extended_what_is_hint = extended_what_is_hint + + def errno(self) -> VerdictErrno: + return self.__verdict_errno + + def is_success(self) -> bool: + return self.__verdict_errno == VerdictErrno.ERROR_SUCCESS + + def is_failed(self) -> bool: + return not self.is_success() + + def verdict_message(self) -> str: + return self.__verdict_errno.value + + def extended_what(self) -> List[str]: + return self.__extended_what + + def extended_what_is_hint(self) -> bool: + return self.__extended_what_is_hint + + def what(self) -> str: + if self.__what is None: + return "no additional information" + return self.__what + +def ok() -> Verdict: + return Verdict(VerdictErrno.ERROR_SUCCESS) + +class DynamicWrapper(enum.Enum): + NO_WRAPPER = "no" + VALGRIND_ANALYZER = "valgrind" + GDB_DEBUGGER = "gdb" + +class Runned: + def __init__(self, c_returncode: int, c_stdout: str, c_stderr: str, c_start: int, c_end: int, dynamic_analyzer: DynamicWrapper): + self.__c_returncode = c_returncode + self.__c_stdout = c_stdout + self.__c_stderr = c_stderr + self.__c_start = c_start + self.__c_end = c_end + self.__dynamic_analyzer = dynamic_analyzer + + def get_returncode(self) -> int: + return self.__c_returncode + + def get_stdout(self) -> str: + return self.__c_stdout + + def get_stderr(self) -> str: + return self.__c_stderr + + def start(self) -> int: + return self.__c_start + + def end(self) -> int: + return self.__c_end + + def dynamic_analyzer(self) -> DynamicWrapper: + return self.__dynamic_analyzer + +def now() -> int: + return time.time_ns() // 1_000_000 + +class ReturnCodePolicy(enum.Enum): + MatchIfPresented = 0 + ShouldBeZero = 1 + ShouldNotBeZero = 2 + +class Run: + VALGRIND_LOG_FILENAME = "valgrind.log" + VALGRIND_ERROR_MARKER = "VALGRINDERRORMARKERFORPRETESTING" + + GDB_LOG_FILENAME = "gdb.log" + GDB_NO_ERROR_MARKER = "exited normally" + + def __init__(self, c_timeout: Union[float, int], c_stdin: Optional[str], c_args: Optional[List[str]], t_returncode_policy: ReturnCodePolicy, t_returncode: Optional[int] = None, t_stdout: Optional[str] = None, t_stderr_empty: bool = True): + self.__c_timeout = float(c_timeout) + self.__c_stdin = c_stdin + self.__c_args = c_args + self.__t_returncode_policy = t_returncode_policy + self.__t_returncode = t_returncode + self.__t_stdout = t_stdout + self.__t_stderr_empty = t_stderr_empty + + def get_timeout(self) -> float: + return self.__c_timeout + + def stdin_presented(self) -> bool: + return self.__c_stdin is not None + + def get_stdin(self) -> str: + assert self.__c_stdin is not None + return self.__c_stdin + + def args_presented(self) -> bool: + return self.__c_args is not None + + def get_args(self) -> List[str]: + assert self.__c_args is not None + return self.__c_args + + def expected_returncode_policy(self) -> ReturnCodePolicy: + return self.__t_returncode_policy + + def expected_returncode_presented(self) -> bool: + return self.__t_returncode is not None + + def get_expected_returncode(self) -> int: + assert self.__t_returncode is not None + return self.__t_returncode + + def expected_stdout_presented(self) -> bool: + return self.__t_stdout is not None + + def get_expected_stdout(self) -> str: + assert self.__t_stdout is not None + return self.__t_stdout + + def is_stderr_should_be_empty(self) -> bool: + return self.__t_stderr_empty + + def run(self, executable_path: str, timeout_factor: float, dynamic_wrapper: DynamicWrapper) -> Optional[Runned]: + cmd: List[str] = [] + + if dynamic_wrapper == DynamicWrapper.VALGRIND_ANALYZER: + cmd.append("valgrind") + cmd.append("--tool=memcheck") + cmd.append("--leak-check=full") + cmd.append("--show-leak-kinds=all") + cmd.append("--track-origins=yes") + cmd.append("--vgdb=no") + cmd.append(f"--log-file={Run.VALGRIND_LOG_FILENAME}") + cmd.append(f"--error-markers={Run.VALGRIND_ERROR_MARKER},{Run.VALGRIND_ERROR_MARKER}") + elif dynamic_wrapper == DynamicWrapper.GDB_DEBUGGER: + cmd.append("gdb") + cmd.append("-q") + cmd.append("-return-child-result") + cmd.append("--batch-silent") + cmd.append("--eval-command=\"set debuginfod enabled off\"") + cmd.append(f"--eval-command=\"set logging file {Run.GDB_LOG_FILENAME}\"") + cmd.append("--eval-command=\"set logging redirect on\"") + cmd.append("--eval-command=\"set logging overwrite on\"") + cmd.append("--eval-command=\"set logging debugredirect on\"") + cmd.append("--eval-command=\"set logging enabled on\"") + cmd.append("--eval-command=\"set print frame-arguments all\"") + cmd.append("--eval-command=\"run\"") + cmd.append("--eval-command=\"thread apply all bt -frame-info source-and-location -full\"") + cmd.append("--args") + + cmd.append(os.path.abspath(executable_path)) + + if self.args_presented(): + cmd += self.get_args() + + if dynamic_wrapper == DynamicWrapper.GDB_DEBUGGER: + child = subprocess.Popen(" ".join(cmd), stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = True, shell = True, preexec_fn = os.setsid) + else: + child = subprocess.Popen(cmd, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = True) + + try: + start = now() + stdout, stderr = child.communicate(input = self.__c_stdin, timeout = self.__c_timeout * timeout_factor) + end = now() + return Runned(child.returncode, stdout, stderr, start, end, dynamic_wrapper) + except subprocess.TimeoutExpired: + if dynamic_wrapper == DynamicWrapper.GDB_DEBUGGER: + os.killpg(os.getpgid(child.pid), signal.SIGKILL) + else: + child.kill() + + return None + +class Expected(ABC): + def __init__(self): + pass + + @abstractmethod + def test(self, run: Run, runned: Runned) -> Verdict: + raise NotImplementedError("Expected::test is not implemented") + +SingleTest = Tuple[Run, Optional[Expected]] + +class Test: + def __init__(self, name: str, categories: Iterable[str] = [], sequence: List[SingleTest] = []): + self.__name = name + self.__categories = set(categories) + self.__sequence = sequence + + def runs(self) -> List[SingleTest]: + return self.__sequence + + def name(self) -> str: + return self.__name + + def add(self, run: Run, expected: Optional[Expected] = None): + self.add_single_test((run, expected)) + + def add_single_test(self, single_test: SingleTest): + self.__sequence.append(single_test) + + def categories(self) -> Set[str]: + return self.__categories + + def __pretest(self, run: Run, runned: Runned) -> Verdict: + stderr_should_be_empty = run.is_stderr_should_be_empty() + c_stderr = runned.get_stderr() + is_empty_stderr = c_stderr == "" + + if runned.dynamic_analyzer() == DynamicWrapper.VALGRIND_ANALYZER: + extended_what: List[str] = [] + lines = open(Run.VALGRIND_LOG_FILENAME, "r").readlines() + start = False + + for line in lines: + error_marker_appears = Run.VALGRIND_ERROR_MARKER in line + + if error_marker_appears: + start = not start + continue + + if start: + extended_what.append(line.rstrip()) + + if len(extended_what) != 0: + return Verdict(VerdictErrno.ERROR_VALGRIND_MEMCHECK, f"below is what was in the valgrind log", extended_what) + elif runned.dynamic_analyzer() == DynamicWrapper.GDB_DEBUGGER: + extended_what: List[str] = [] + + lines = open(Run.GDB_LOG_FILENAME, "r").readlines() + no_error_marker_appears = any(Run.GDB_NO_ERROR_MARKER in line for line in lines) + + if not no_error_marker_appears: + for line in lines: + extended_what.append(line.rstrip()) + return Verdict(VerdictErrno.ERROR_GDB_ERROR, f"below is what was in the GDB log", extended_what) + + if stderr_should_be_empty and not is_empty_stderr: + return Verdict(VerdictErrno.ERROR_STDERR_IS_NOT_EMPTY, f"below is what was in the stderr", c_stderr) + + if not stderr_should_be_empty and is_empty_stderr: + return Verdict(VerdictErrno.ERROR_STDERR_EMPTY) + + policy = run.expected_returncode_policy() + actual_returncode = runned.get_returncode() + + if policy == ReturnCodePolicy.ShouldBeZero: + if actual_returncode != 0: + return Verdict(VerdictErrno.ERROR_RETURNCODE, f"expected {0}, but actual is {actual_returncode}") + elif policy == ReturnCodePolicy.ShouldNotBeZero: + if actual_returncode == 0: + return Verdict(VerdictErrno.ERROR_RETURNCODE, f"expected non-zero returncode, but actual is {actual_returncode}") + elif policy == ReturnCodePolicy.MatchIfPresented and run.expected_returncode_presented(): + expected_returncode = run.get_expected_returncode() + if actual_returncode != expected_returncode: + return Verdict(VerdictErrno.ERROR_RETURNCODE, f"expected {expected_returncode}, but actual is {actual_returncode}") + + return ok() + + def __base_message(self, i: int) -> str: + return f"Run #{i + 1}/{len(self.__sequence)}..." + + def __print_verdict(self, i: int, verdict: Verdict, log: Log): + log.println(f"{self.__base_message(i)} FAILED.") + log.println(f"{verdict.verdict_message().capitalize()}: {verdict.what()}.") + lines = verdict.extended_what() + if verdict.extended_what_is_hint(): + assert len(lines) == 1 + log.println(f"Hint: {lines[0]}.") + elif len(lines) >= 1: + for line in lines: + log.println(line) + + def __invoke(self, executable_path: str, timeout_factor: float, dynamic_wrapper: DynamicWrapper, log: Log) -> Verdict: + for i, (run, expected) in enumerate(self.__sequence): + log.println(self.__base_message(i)) + + runned = run.run(executable_path, timeout_factor, dynamic_wrapper) + + if runned is None: + verdict = Verdict(VerdictErrno.ERROR_TIMEOUT, f"executed in more than {run.get_timeout() * timeout_factor}s") + self.__print_verdict(i, verdict, log) + return verdict + + verdict = self.__pretest(run, runned) + + if verdict.is_failed(): + self.__print_verdict(i, verdict, log) + return verdict + + if expected is not None: + verdict = expected.test(run, runned) + if verdict.is_failed(): + self.__print_verdict(i, verdict, log) + return verdict + + log.println(f"{self.__base_message(i)} ok: passed in {runned.end() - runned.start()}ms.") + + return ok() + + def invoke(self, executable_path: str, timeout_factor: float, dynamic_wrapper: DynamicWrapper, log: Log) -> Verdict: + return log.scope(f"Running sequence of {len(self.__sequence)} runs:", lambda : self.__invoke(executable_path, timeout_factor, dynamic_wrapper, log)) + + def warm(self, executable_path: str, timeout_factor: float): + for run, _ in self.__sequence: + _ = run.run(executable_path, timeout_factor, DynamicWrapper.NO_WRAPPER) + +class Result: + def __init__(self, tests: List[Test]): + self.__tests = tests + self.__verdicts: List[Verdict] = [] + self.__passed = 0 + + def add(self, verdict: Verdict): + if verdict.is_success(): + self.__passed += 1 + self.__verdicts.append(verdict) + + def n(self) -> int: + return len(self.__verdicts) + + def passed(self) -> int: + return self.__passed + + def exitcode(self) -> int: + return 0 if self.n() == self.passed() else 1 + + def __get_passed_by_category(self, category: str) -> int: + passed = 0 + + for test, verdict in zip(self.__tests, self.__verdicts): + if verdict.is_success() and category in test.categories(): + passed += 1 + + return passed + + def __get_total_by_category(self, category: str) -> int: + total = 0 + + for test, verdict in zip(self.__tests, self.__verdicts): + if category in test.categories(): + total += 1 + + return total + + def __get_result_by_category(self, category: str) -> float: + total = self.__get_total_by_category(category) + passed = self.__get_passed_by_category(category) + return passed / total + + def __calculate_total(self, coefficients: Dict[str, float]) -> float: + total = 0.0 + + for category, coefficient in coefficients.items(): + result = self.__get_result_by_category(category) + total += result * coefficient + + return total + + def __get_results_by_categories(self, categories: Iterable[str]) -> Dict[str, float]: + results: Dict[str, float] = {} + + for category in categories: + results[category] = self.__get_result_by_category(category) + + return results + + def __get_results(self) -> List[Dict[str, Any]]: + results: List[Dict[str, Any]] = [] + + for i, (test, verdict) in enumerate(zip(self.__tests, self.__verdicts)): + result: Dict[str, Any] = {} + + result["id"] = i + result["name"] = test.name() + result["categories"] = list(test.categories()) + result["passed"] = verdict.is_success() + result["verdict"] = verdict.verdict_message() + result["what"] = verdict.what() if verdict.is_failed() else "" + + runs: List[Dict[str, Any]] = [] + + for r, _ in test.runs(): + run: Dict[str, Any] = {} + + run["timeout"] = r.get_timeout() + run["stdin"] = escape(r.get_stdin()) if r.stdin_presented() else "" + run["args"] = [escape(arg) for arg in r.get_args()] if r.args_presented() else "" + run["expected_returncode"] = r.get_expected_returncode() if r.expected_returncode_presented() else "" + run["expected_stdout"] = r.get_expected_stdout() if r.expected_stdout_presented() else "" + run["stderr_should_be_empty"] = r.is_stderr_should_be_empty() + + runs.append(run) + + result["runs"] = runs + + results.append(result) + + return results + + def export_report(self, output_path: str, coefficients: Dict[str, float]): + categories = coefficients.keys() + + j: Dict[str, Any] = {} + + j["result"] = self.__calculate_total(coefficients) + j["categories"] = self.__get_results_by_categories(categories) + j["tests"] = self.__get_results() + + o = json.dumps(j, indent = 4) + + with open(output_path, "w") as file: + file.write(f"{o}\n") + +class Tester: + def __init__(self, testsuite_name: str): + self.__tests: List[Test] = [] + self.__testsuite_name = testsuite_name + self.__log = Log(indent_factor = 4) + + def add(self, test: Test): + self.__tests.append(test) + + def warm(self, executable_path: str, timeout_factor: float): + print(f"=== Warming `{self.__testsuite_name}`...") + for test in self.__tests: + test.warm(executable_path, timeout_factor) + + def run(self, executable_path: str, timeout_factor: float, dynamic_wrapper: DynamicWrapper) -> Result: + print(f"=== Testing `{self.__testsuite_name}`...") + + result = Result(self.__tests) + start = now() + + for test in self.__tests: + verdict = self.__log.scope(f"Test '{test.name()}' starts...", lambda : test.invoke(executable_path, timeout_factor, dynamic_wrapper, self.__log)) + result.add(verdict) + + end = now() + + print(f"{"=" * 30}") + print(f"{result.passed()}/{result.n()} tests passed in {end - start}ms") + + return result + +class Testsuite(ABC): + def __init__(self, name: str, prefenvname: str, catenvnames: Dict[str, str]): + self.__name = name + self.__prefenvname = prefenvname + self.__catenvnames = catenvnames + + def name(self) -> str: + return self.__name + + def get_coefficients(self) -> Dict[str, float]: + coefficients: Dict[str, float] = {} + categories = list(self.__catenvnames.keys()) + + for category in categories: + key = f"{self.__prefenvname.upper()}_{self.__name.upper()}_{self.__catenvnames[category]}" + value = os.getenv(key) + + if value is None: + value = 0.0 + else: + try: + value = float(value) + except TypeError as _: + value = 0.0 + + coefficients[category] = value + + return coefficients + + @abstractmethod + def get_tester(self) -> Tester: + raise NotImplementedError("Testsuite::get_tester is not implemented")