gh-109162: libregrtest: add single.py and result.py (#109243)
* Add single.py and result.py files. * Rename runtest.py to runtests.py. * Move run_single_test() function and its helper functions to single.py. * Move remove_testfn(), abs_module_name() and normalize_test_name() to utils.py. * Move setup_support() to setup.py. * Move type hints like TestName to utils.py. * Rename runtest.py to runtests.py.
This commit is contained in:
parent
a939b65aa6
commit
1ec45378e9
58
Lib/test/libregrtest/findtests.py
Normal file
58
Lib/test/libregrtest/findtests.py
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
|
from test.libregrtest.utils import StrPath, TestName, TestList
|
||||||
|
|
||||||
|
|
||||||
|
# If these test directories are encountered recurse into them and treat each
|
||||||
|
# "test_*.py" file or each sub-directory as a separate test module. This can
|
||||||
|
# increase parallelism.
|
||||||
|
#
|
||||||
|
# Beware this can't generally be done for any directory with sub-tests as the
|
||||||
|
# __init__.py may do things which alter what tests are to be run.
|
||||||
|
SPLITTESTDIRS: set[TestName] = {
|
||||||
|
"test_asyncio",
|
||||||
|
"test_concurrent_futures",
|
||||||
|
"test_multiprocessing_fork",
|
||||||
|
"test_multiprocessing_forkserver",
|
||||||
|
"test_multiprocessing_spawn",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def findtestdir(path=None):
|
||||||
|
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
|
||||||
|
|
||||||
|
|
||||||
|
def findtests(*, testdir: StrPath | None = None, exclude=(),
|
||||||
|
split_test_dirs: set[TestName] = SPLITTESTDIRS,
|
||||||
|
base_mod: str = "") -> TestList:
|
||||||
|
"""Return a list of all applicable test modules."""
|
||||||
|
testdir = findtestdir(testdir)
|
||||||
|
tests = []
|
||||||
|
for name in os.listdir(testdir):
|
||||||
|
mod, ext = os.path.splitext(name)
|
||||||
|
if (not mod.startswith("test_")) or (mod in exclude):
|
||||||
|
continue
|
||||||
|
if mod in split_test_dirs:
|
||||||
|
subdir = os.path.join(testdir, mod)
|
||||||
|
mod = f"{base_mod or 'test'}.{mod}"
|
||||||
|
tests.extend(findtests(testdir=subdir, exclude=exclude,
|
||||||
|
split_test_dirs=split_test_dirs,
|
||||||
|
base_mod=mod))
|
||||||
|
elif ext in (".py", ""):
|
||||||
|
tests.append(f"{base_mod}.{mod}" if base_mod else mod)
|
||||||
|
return sorted(tests)
|
||||||
|
|
||||||
|
|
||||||
|
def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
|
||||||
|
split_test_dirs=SPLITTESTDIRS):
|
||||||
|
testdir = findtestdir(testdir)
|
||||||
|
splitted = []
|
||||||
|
for name in tests:
|
||||||
|
if name in split_test_dirs:
|
||||||
|
subdir = os.path.join(testdir, name)
|
||||||
|
splitted.extend(findtests(testdir=subdir, exclude=exclude,
|
||||||
|
split_test_dirs=split_test_dirs,
|
||||||
|
base_mod=name))
|
||||||
|
else:
|
||||||
|
splitted.append(name)
|
||||||
|
return splitted
|
@ -1,7 +1,7 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from test.libregrtest.runtest import RunTests
|
from test.libregrtest.runtests import RunTests
|
||||||
from test.libregrtest.utils import print_warning, MS_WINDOWS
|
from test.libregrtest.utils import print_warning, MS_WINDOWS
|
||||||
|
|
||||||
if MS_WINDOWS:
|
if MS_WINDOWS:
|
||||||
|
@ -11,17 +11,19 @@ from test import support
|
|||||||
from test.support import os_helper
|
from test.support import os_helper
|
||||||
|
|
||||||
from test.libregrtest.cmdline import _parse_args, Namespace
|
from test.libregrtest.cmdline import _parse_args, Namespace
|
||||||
|
from test.libregrtest.findtests import findtests, split_test_packages
|
||||||
from test.libregrtest.logger import Logger
|
from test.libregrtest.logger import Logger
|
||||||
from test.libregrtest.runtest import (
|
from test.libregrtest.result import State
|
||||||
findtests, split_test_packages, run_single_test, abs_module_name,
|
from test.libregrtest.runtests import RunTests, HuntRefleak
|
||||||
PROGRESS_MIN_TIME, State, RunTests, HuntRefleak,
|
|
||||||
FilterTuple, TestList, StrJSON, TestName)
|
|
||||||
from test.libregrtest.setup import setup_tests, setup_test_dir
|
from test.libregrtest.setup import setup_tests, setup_test_dir
|
||||||
|
from test.libregrtest.single import run_single_test, PROGRESS_MIN_TIME
|
||||||
from test.libregrtest.pgo import setup_pgo_tests
|
from test.libregrtest.pgo import setup_pgo_tests
|
||||||
from test.libregrtest.results import TestResults
|
from test.libregrtest.results import TestResults
|
||||||
from test.libregrtest.utils import (
|
from test.libregrtest.utils import (
|
||||||
strip_py_suffix, count, format_duration, StrPath,
|
StrPath, StrJSON, TestName, TestList, FilterTuple,
|
||||||
printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout)
|
strip_py_suffix, count, format_duration,
|
||||||
|
printlist, get_build_info, get_temp_dir, get_work_dir, exit_timeout,
|
||||||
|
abs_module_name)
|
||||||
|
|
||||||
|
|
||||||
class Regrtest:
|
class Regrtest:
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
import warnings
|
import warnings
|
||||||
from inspect import isabstract
|
from inspect import isabstract
|
||||||
|
|
||||||
from test import support
|
from test import support
|
||||||
from test.support import os_helper
|
from test.support import os_helper
|
||||||
from test.libregrtest.runtest import HuntRefleak
|
|
||||||
|
from test.libregrtest.runtests import HuntRefleak
|
||||||
from test.libregrtest.utils import clear_caches
|
from test.libregrtest.utils import clear_caches
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
184
Lib/test/libregrtest/result.py
Normal file
184
Lib/test/libregrtest/result.py
Normal file
@ -0,0 +1,184 @@
|
|||||||
|
import dataclasses
|
||||||
|
import json
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from test.support import TestStats
|
||||||
|
|
||||||
|
from test.libregrtest.utils import (
|
||||||
|
TestName, FilterTuple,
|
||||||
|
format_duration, normalize_test_name, print_warning)
|
||||||
|
|
||||||
|
|
||||||
|
# Avoid enum.Enum to reduce the number of imports when tests are run
|
||||||
|
class State:
|
||||||
|
PASSED = "PASSED"
|
||||||
|
FAILED = "FAILED"
|
||||||
|
SKIPPED = "SKIPPED"
|
||||||
|
UNCAUGHT_EXC = "UNCAUGHT_EXC"
|
||||||
|
REFLEAK = "REFLEAK"
|
||||||
|
ENV_CHANGED = "ENV_CHANGED"
|
||||||
|
RESOURCE_DENIED = "RESOURCE_DENIED"
|
||||||
|
INTERRUPTED = "INTERRUPTED"
|
||||||
|
MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
|
||||||
|
DID_NOT_RUN = "DID_NOT_RUN"
|
||||||
|
TIMEOUT = "TIMEOUT"
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def is_failed(state):
|
||||||
|
return state in {
|
||||||
|
State.FAILED,
|
||||||
|
State.UNCAUGHT_EXC,
|
||||||
|
State.REFLEAK,
|
||||||
|
State.MULTIPROCESSING_ERROR,
|
||||||
|
State.TIMEOUT}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def has_meaningful_duration(state):
|
||||||
|
# Consider that the duration is meaningless for these cases.
|
||||||
|
# For example, if a whole test file is skipped, its duration
|
||||||
|
# is unlikely to be the duration of executing its tests,
|
||||||
|
# but just the duration to execute code which skips the test.
|
||||||
|
return state not in {
|
||||||
|
State.SKIPPED,
|
||||||
|
State.RESOURCE_DENIED,
|
||||||
|
State.INTERRUPTED,
|
||||||
|
State.MULTIPROCESSING_ERROR,
|
||||||
|
State.DID_NOT_RUN}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def must_stop(state):
|
||||||
|
return state in {
|
||||||
|
State.INTERRUPTED,
|
||||||
|
State.MULTIPROCESSING_ERROR}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True)
|
||||||
|
class TestResult:
|
||||||
|
test_name: TestName
|
||||||
|
state: str | None = None
|
||||||
|
# Test duration in seconds
|
||||||
|
duration: float | None = None
|
||||||
|
xml_data: list[str] | None = None
|
||||||
|
stats: TestStats | None = None
|
||||||
|
|
||||||
|
# errors and failures copied from support.TestFailedWithDetails
|
||||||
|
errors: list[tuple[str, str]] | None = None
|
||||||
|
failures: list[tuple[str, str]] | None = None
|
||||||
|
|
||||||
|
def is_failed(self, fail_env_changed: bool) -> bool:
|
||||||
|
if self.state == State.ENV_CHANGED:
|
||||||
|
return fail_env_changed
|
||||||
|
return State.is_failed(self.state)
|
||||||
|
|
||||||
|
def _format_failed(self):
|
||||||
|
if self.errors and self.failures:
|
||||||
|
le = len(self.errors)
|
||||||
|
lf = len(self.failures)
|
||||||
|
error_s = "error" + ("s" if le > 1 else "")
|
||||||
|
failure_s = "failure" + ("s" if lf > 1 else "")
|
||||||
|
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
|
||||||
|
|
||||||
|
if self.errors:
|
||||||
|
le = len(self.errors)
|
||||||
|
error_s = "error" + ("s" if le > 1 else "")
|
||||||
|
return f"{self.test_name} failed ({le} {error_s})"
|
||||||
|
|
||||||
|
if self.failures:
|
||||||
|
lf = len(self.failures)
|
||||||
|
failure_s = "failure" + ("s" if lf > 1 else "")
|
||||||
|
return f"{self.test_name} failed ({lf} {failure_s})"
|
||||||
|
|
||||||
|
return f"{self.test_name} failed"
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
match self.state:
|
||||||
|
case State.PASSED:
|
||||||
|
return f"{self.test_name} passed"
|
||||||
|
case State.FAILED:
|
||||||
|
return self._format_failed()
|
||||||
|
case State.SKIPPED:
|
||||||
|
return f"{self.test_name} skipped"
|
||||||
|
case State.UNCAUGHT_EXC:
|
||||||
|
return f"{self.test_name} failed (uncaught exception)"
|
||||||
|
case State.REFLEAK:
|
||||||
|
return f"{self.test_name} failed (reference leak)"
|
||||||
|
case State.ENV_CHANGED:
|
||||||
|
return f"{self.test_name} failed (env changed)"
|
||||||
|
case State.RESOURCE_DENIED:
|
||||||
|
return f"{self.test_name} skipped (resource denied)"
|
||||||
|
case State.INTERRUPTED:
|
||||||
|
return f"{self.test_name} interrupted"
|
||||||
|
case State.MULTIPROCESSING_ERROR:
|
||||||
|
return f"{self.test_name} process crashed"
|
||||||
|
case State.DID_NOT_RUN:
|
||||||
|
return f"{self.test_name} ran no tests"
|
||||||
|
case State.TIMEOUT:
|
||||||
|
return f"{self.test_name} timed out ({format_duration(self.duration)})"
|
||||||
|
case _:
|
||||||
|
raise ValueError("unknown result state: {state!r}")
|
||||||
|
|
||||||
|
def has_meaningful_duration(self):
|
||||||
|
return State.has_meaningful_duration(self.state)
|
||||||
|
|
||||||
|
def set_env_changed(self):
|
||||||
|
if self.state is None or self.state == State.PASSED:
|
||||||
|
self.state = State.ENV_CHANGED
|
||||||
|
|
||||||
|
def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
|
||||||
|
if State.must_stop(self.state):
|
||||||
|
return True
|
||||||
|
if fail_fast and self.is_failed(fail_env_changed):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_rerun_match_tests(self) -> FilterTuple | None:
|
||||||
|
match_tests = []
|
||||||
|
|
||||||
|
errors = self.errors or []
|
||||||
|
failures = self.failures or []
|
||||||
|
for error_list, is_error in (
|
||||||
|
(errors, True),
|
||||||
|
(failures, False),
|
||||||
|
):
|
||||||
|
for full_name, *_ in error_list:
|
||||||
|
match_name = normalize_test_name(full_name, is_error=is_error)
|
||||||
|
if match_name is None:
|
||||||
|
# 'setUpModule (test.test_sys)': don't filter tests
|
||||||
|
return None
|
||||||
|
if not match_name:
|
||||||
|
error_type = "ERROR" if is_error else "FAIL"
|
||||||
|
print_warning(f"rerun failed to parse {error_type} test name: "
|
||||||
|
f"{full_name!r}: don't filter tests")
|
||||||
|
return None
|
||||||
|
match_tests.append(match_name)
|
||||||
|
|
||||||
|
if not match_tests:
|
||||||
|
return None
|
||||||
|
return tuple(match_tests)
|
||||||
|
|
||||||
|
def write_json(self, file) -> None:
|
||||||
|
json.dump(self, file, cls=_EncodeTestResult)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_json(worker_json) -> 'TestResult':
|
||||||
|
return json.loads(worker_json, object_hook=_decode_test_result)
|
||||||
|
|
||||||
|
|
||||||
|
class _EncodeTestResult(json.JSONEncoder):
|
||||||
|
def default(self, o: Any) -> dict[str, Any]:
|
||||||
|
if isinstance(o, TestResult):
|
||||||
|
result = dataclasses.asdict(o)
|
||||||
|
result["__test_result__"] = o.__class__.__name__
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
return super().default(o)
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
|
||||||
|
if "__test_result__" in data:
|
||||||
|
data.pop('__test_result__')
|
||||||
|
if data['stats'] is not None:
|
||||||
|
data['stats'] = TestStats(**data['stats'])
|
||||||
|
return TestResult(**data)
|
||||||
|
else:
|
||||||
|
return data
|
@ -1,11 +1,11 @@
|
|||||||
import sys
|
import sys
|
||||||
from test.support import TestStats
|
from test.support import TestStats
|
||||||
|
|
||||||
from test.libregrtest.runtest import (
|
from test.libregrtest.runtests import RunTests
|
||||||
TestName, TestTuple, TestList, FilterDict, State,
|
from test.libregrtest.result import State, TestResult
|
||||||
TestResult, RunTests)
|
|
||||||
from test.libregrtest.utils import (
|
from test.libregrtest.utils import (
|
||||||
printlist, count, format_duration, StrPath)
|
StrPath, TestName, TestTuple, TestList, FilterDict,
|
||||||
|
printlist, count, format_duration)
|
||||||
|
|
||||||
|
|
||||||
EXITCODE_BAD_TEST = 2
|
EXITCODE_BAD_TEST = 2
|
||||||
|
@ -1,676 +0,0 @@
|
|||||||
import dataclasses
|
|
||||||
import doctest
|
|
||||||
import faulthandler
|
|
||||||
import gc
|
|
||||||
import importlib
|
|
||||||
import io
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import time
|
|
||||||
import traceback
|
|
||||||
import unittest
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from test import support
|
|
||||||
from test.support import TestStats
|
|
||||||
from test.support import os_helper
|
|
||||||
from test.support import threading_helper
|
|
||||||
from test.libregrtest.save_env import saved_test_environment
|
|
||||||
from test.libregrtest.utils import (
|
|
||||||
clear_caches, format_duration, print_warning, StrPath)
|
|
||||||
|
|
||||||
|
|
||||||
StrJSON = str
|
|
||||||
TestName = str
|
|
||||||
TestTuple = tuple[TestName, ...]
|
|
||||||
TestList = list[TestName]
|
|
||||||
|
|
||||||
# --match and --ignore options: list of patterns
|
|
||||||
# ('*' joker character can be used)
|
|
||||||
FilterTuple = tuple[TestName, ...]
|
|
||||||
FilterDict = dict[TestName, FilterTuple]
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True, frozen=True)
|
|
||||||
class HuntRefleak:
|
|
||||||
warmups: int
|
|
||||||
runs: int
|
|
||||||
filename: StrPath
|
|
||||||
|
|
||||||
|
|
||||||
# Avoid enum.Enum to reduce the number of imports when tests are run
|
|
||||||
class State:
|
|
||||||
PASSED = "PASSED"
|
|
||||||
FAILED = "FAILED"
|
|
||||||
SKIPPED = "SKIPPED"
|
|
||||||
UNCAUGHT_EXC = "UNCAUGHT_EXC"
|
|
||||||
REFLEAK = "REFLEAK"
|
|
||||||
ENV_CHANGED = "ENV_CHANGED"
|
|
||||||
RESOURCE_DENIED = "RESOURCE_DENIED"
|
|
||||||
INTERRUPTED = "INTERRUPTED"
|
|
||||||
MULTIPROCESSING_ERROR = "MULTIPROCESSING_ERROR"
|
|
||||||
DID_NOT_RUN = "DID_NOT_RUN"
|
|
||||||
TIMEOUT = "TIMEOUT"
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def is_failed(state):
|
|
||||||
return state in {
|
|
||||||
State.FAILED,
|
|
||||||
State.UNCAUGHT_EXC,
|
|
||||||
State.REFLEAK,
|
|
||||||
State.MULTIPROCESSING_ERROR,
|
|
||||||
State.TIMEOUT}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def has_meaningful_duration(state):
|
|
||||||
# Consider that the duration is meaningless for these cases.
|
|
||||||
# For example, if a whole test file is skipped, its duration
|
|
||||||
# is unlikely to be the duration of executing its tests,
|
|
||||||
# but just the duration to execute code which skips the test.
|
|
||||||
return state not in {
|
|
||||||
State.SKIPPED,
|
|
||||||
State.RESOURCE_DENIED,
|
|
||||||
State.INTERRUPTED,
|
|
||||||
State.MULTIPROCESSING_ERROR,
|
|
||||||
State.DID_NOT_RUN}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def must_stop(state):
|
|
||||||
return state in {
|
|
||||||
State.INTERRUPTED,
|
|
||||||
State.MULTIPROCESSING_ERROR}
|
|
||||||
|
|
||||||
|
|
||||||
# gh-90681: When rerunning tests, we might need to rerun the whole
|
|
||||||
# class or module suite if some its life-cycle hooks fail.
|
|
||||||
# Test level hooks are not affected.
|
|
||||||
_TEST_LIFECYCLE_HOOKS = frozenset((
|
|
||||||
'setUpClass', 'tearDownClass',
|
|
||||||
'setUpModule', 'tearDownModule',
|
|
||||||
))
|
|
||||||
|
|
||||||
def normalize_test_name(test_full_name, *, is_error=False):
|
|
||||||
short_name = test_full_name.split(" ")[0]
|
|
||||||
if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
|
|
||||||
if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
|
|
||||||
# if setUpModule() or tearDownModule() failed, don't filter
|
|
||||||
# tests with the test file name, don't use use filters.
|
|
||||||
return None
|
|
||||||
|
|
||||||
# This means that we have a failure in a life-cycle hook,
|
|
||||||
# we need to rerun the whole module or class suite.
|
|
||||||
# Basically the error looks like this:
|
|
||||||
# ERROR: setUpClass (test.test_reg_ex.RegTest)
|
|
||||||
# or
|
|
||||||
# ERROR: setUpModule (test.test_reg_ex)
|
|
||||||
# So, we need to parse the class / module name.
|
|
||||||
lpar = test_full_name.index('(')
|
|
||||||
rpar = test_full_name.index(')')
|
|
||||||
return test_full_name[lpar + 1: rpar].split('.')[-1]
|
|
||||||
return short_name
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True)
|
|
||||||
class TestResult:
|
|
||||||
test_name: TestName
|
|
||||||
state: str | None = None
|
|
||||||
# Test duration in seconds
|
|
||||||
duration: float | None = None
|
|
||||||
xml_data: list[str] | None = None
|
|
||||||
stats: TestStats | None = None
|
|
||||||
|
|
||||||
# errors and failures copied from support.TestFailedWithDetails
|
|
||||||
errors: list[tuple[str, str]] | None = None
|
|
||||||
failures: list[tuple[str, str]] | None = None
|
|
||||||
|
|
||||||
def is_failed(self, fail_env_changed: bool) -> bool:
|
|
||||||
if self.state == State.ENV_CHANGED:
|
|
||||||
return fail_env_changed
|
|
||||||
return State.is_failed(self.state)
|
|
||||||
|
|
||||||
def _format_failed(self):
|
|
||||||
if self.errors and self.failures:
|
|
||||||
le = len(self.errors)
|
|
||||||
lf = len(self.failures)
|
|
||||||
error_s = "error" + ("s" if le > 1 else "")
|
|
||||||
failure_s = "failure" + ("s" if lf > 1 else "")
|
|
||||||
return f"{self.test_name} failed ({le} {error_s}, {lf} {failure_s})"
|
|
||||||
|
|
||||||
if self.errors:
|
|
||||||
le = len(self.errors)
|
|
||||||
error_s = "error" + ("s" if le > 1 else "")
|
|
||||||
return f"{self.test_name} failed ({le} {error_s})"
|
|
||||||
|
|
||||||
if self.failures:
|
|
||||||
lf = len(self.failures)
|
|
||||||
failure_s = "failure" + ("s" if lf > 1 else "")
|
|
||||||
return f"{self.test_name} failed ({lf} {failure_s})"
|
|
||||||
|
|
||||||
return f"{self.test_name} failed"
|
|
||||||
|
|
||||||
def __str__(self) -> str:
|
|
||||||
match self.state:
|
|
||||||
case State.PASSED:
|
|
||||||
return f"{self.test_name} passed"
|
|
||||||
case State.FAILED:
|
|
||||||
return self._format_failed()
|
|
||||||
case State.SKIPPED:
|
|
||||||
return f"{self.test_name} skipped"
|
|
||||||
case State.UNCAUGHT_EXC:
|
|
||||||
return f"{self.test_name} failed (uncaught exception)"
|
|
||||||
case State.REFLEAK:
|
|
||||||
return f"{self.test_name} failed (reference leak)"
|
|
||||||
case State.ENV_CHANGED:
|
|
||||||
return f"{self.test_name} failed (env changed)"
|
|
||||||
case State.RESOURCE_DENIED:
|
|
||||||
return f"{self.test_name} skipped (resource denied)"
|
|
||||||
case State.INTERRUPTED:
|
|
||||||
return f"{self.test_name} interrupted"
|
|
||||||
case State.MULTIPROCESSING_ERROR:
|
|
||||||
return f"{self.test_name} process crashed"
|
|
||||||
case State.DID_NOT_RUN:
|
|
||||||
return f"{self.test_name} ran no tests"
|
|
||||||
case State.TIMEOUT:
|
|
||||||
return f"{self.test_name} timed out ({format_duration(self.duration)})"
|
|
||||||
case _:
|
|
||||||
raise ValueError("unknown result state: {state!r}")
|
|
||||||
|
|
||||||
def has_meaningful_duration(self):
|
|
||||||
return State.has_meaningful_duration(self.state)
|
|
||||||
|
|
||||||
def set_env_changed(self):
|
|
||||||
if self.state is None or self.state == State.PASSED:
|
|
||||||
self.state = State.ENV_CHANGED
|
|
||||||
|
|
||||||
def must_stop(self, fail_fast: bool, fail_env_changed: bool) -> bool:
|
|
||||||
if State.must_stop(self.state):
|
|
||||||
return True
|
|
||||||
if fail_fast and self.is_failed(fail_env_changed):
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get_rerun_match_tests(self) -> FilterTuple | None:
|
|
||||||
match_tests = []
|
|
||||||
|
|
||||||
errors = self.errors or []
|
|
||||||
failures = self.failures or []
|
|
||||||
for error_list, is_error in (
|
|
||||||
(errors, True),
|
|
||||||
(failures, False),
|
|
||||||
):
|
|
||||||
for full_name, *_ in error_list:
|
|
||||||
match_name = normalize_test_name(full_name, is_error=is_error)
|
|
||||||
if match_name is None:
|
|
||||||
# 'setUpModule (test.test_sys)': don't filter tests
|
|
||||||
return None
|
|
||||||
if not match_name:
|
|
||||||
error_type = "ERROR" if is_error else "FAIL"
|
|
||||||
print_warning(f"rerun failed to parse {error_type} test name: "
|
|
||||||
f"{full_name!r}: don't filter tests")
|
|
||||||
return None
|
|
||||||
match_tests.append(match_name)
|
|
||||||
|
|
||||||
if not match_tests:
|
|
||||||
return None
|
|
||||||
return tuple(match_tests)
|
|
||||||
|
|
||||||
def write_json(self, file) -> None:
|
|
||||||
json.dump(self, file, cls=_EncodeTestResult)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_json(worker_json) -> 'TestResult':
|
|
||||||
return json.loads(worker_json, object_hook=_decode_test_result)
|
|
||||||
|
|
||||||
|
|
||||||
class _EncodeTestResult(json.JSONEncoder):
|
|
||||||
def default(self, o: Any) -> dict[str, Any]:
|
|
||||||
if isinstance(o, TestResult):
|
|
||||||
result = dataclasses.asdict(o)
|
|
||||||
result["__test_result__"] = o.__class__.__name__
|
|
||||||
return result
|
|
||||||
else:
|
|
||||||
return super().default(o)
|
|
||||||
|
|
||||||
|
|
||||||
def _decode_test_result(data: dict[str, Any]) -> TestResult | dict[str, Any]:
|
|
||||||
if "__test_result__" in data:
|
|
||||||
data.pop('__test_result__')
|
|
||||||
if data['stats'] is not None:
|
|
||||||
data['stats'] = TestStats(**data['stats'])
|
|
||||||
return TestResult(**data)
|
|
||||||
else:
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(slots=True, frozen=True)
|
|
||||||
class RunTests:
|
|
||||||
tests: TestTuple
|
|
||||||
fail_fast: bool = False
|
|
||||||
fail_env_changed: bool = False
|
|
||||||
match_tests: FilterTuple | None = None
|
|
||||||
ignore_tests: FilterTuple | None = None
|
|
||||||
match_tests_dict: FilterDict | None = None
|
|
||||||
rerun: bool = False
|
|
||||||
forever: bool = False
|
|
||||||
pgo: bool = False
|
|
||||||
pgo_extended: bool = False
|
|
||||||
output_on_failure: bool = False
|
|
||||||
timeout: float | None = None
|
|
||||||
verbose: bool = False
|
|
||||||
quiet: bool = False
|
|
||||||
hunt_refleak: HuntRefleak | None = None
|
|
||||||
test_dir: StrPath | None = None
|
|
||||||
use_junit: bool = False
|
|
||||||
memory_limit: str | None = None
|
|
||||||
gc_threshold: int | None = None
|
|
||||||
use_resources: list[str] = None
|
|
||||||
python_cmd: list[str] | None = None
|
|
||||||
|
|
||||||
def copy(self, **override):
|
|
||||||
state = dataclasses.asdict(self)
|
|
||||||
state.update(override)
|
|
||||||
return RunTests(**state)
|
|
||||||
|
|
||||||
def get_match_tests(self, test_name) -> FilterTuple | None:
|
|
||||||
if self.match_tests_dict is not None:
|
|
||||||
return self.match_tests_dict.get(test_name, None)
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
def iter_tests(self):
|
|
||||||
if self.forever:
|
|
||||||
while True:
|
|
||||||
yield from self.tests
|
|
||||||
else:
|
|
||||||
yield from self.tests
|
|
||||||
|
|
||||||
def as_json(self) -> StrJSON:
|
|
||||||
return json.dumps(self, cls=_EncodeRunTests)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_json(worker_json: StrJSON) -> 'RunTests':
|
|
||||||
return json.loads(worker_json, object_hook=_decode_runtests)
|
|
||||||
|
|
||||||
|
|
||||||
class _EncodeRunTests(json.JSONEncoder):
|
|
||||||
def default(self, o: Any) -> dict[str, Any]:
|
|
||||||
if isinstance(o, RunTests):
|
|
||||||
result = dataclasses.asdict(o)
|
|
||||||
result["__runtests__"] = True
|
|
||||||
return result
|
|
||||||
else:
|
|
||||||
return super().default(o)
|
|
||||||
|
|
||||||
|
|
||||||
def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
|
|
||||||
if "__runtests__" in data:
|
|
||||||
data.pop('__runtests__')
|
|
||||||
if data['hunt_refleak']:
|
|
||||||
data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
|
|
||||||
return RunTests(**data)
|
|
||||||
else:
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
# Minimum duration of a test to display its duration or to mention that
|
|
||||||
# the test is running in background
|
|
||||||
PROGRESS_MIN_TIME = 30.0 # seconds
|
|
||||||
|
|
||||||
#If these test directories are encountered recurse into them and treat each
|
|
||||||
# test_ .py or dir as a separate test module. This can increase parallelism.
|
|
||||||
# Beware this can't generally be done for any directory with sub-tests as the
|
|
||||||
# __init__.py may do things which alter what tests are to be run.
|
|
||||||
|
|
||||||
SPLITTESTDIRS: set[TestName] = {
|
|
||||||
"test_asyncio",
|
|
||||||
"test_concurrent_futures",
|
|
||||||
"test_multiprocessing_fork",
|
|
||||||
"test_multiprocessing_forkserver",
|
|
||||||
"test_multiprocessing_spawn",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def findtestdir(path=None):
|
|
||||||
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
|
|
||||||
|
|
||||||
|
|
||||||
def findtests(*, testdir: StrPath | None = None, exclude=(),
|
|
||||||
split_test_dirs: set[TestName] = SPLITTESTDIRS,
|
|
||||||
base_mod: str = "") -> TestList:
|
|
||||||
"""Return a list of all applicable test modules."""
|
|
||||||
testdir = findtestdir(testdir)
|
|
||||||
tests = []
|
|
||||||
for name in os.listdir(testdir):
|
|
||||||
mod, ext = os.path.splitext(name)
|
|
||||||
if (not mod.startswith("test_")) or (mod in exclude):
|
|
||||||
continue
|
|
||||||
if mod in split_test_dirs:
|
|
||||||
subdir = os.path.join(testdir, mod)
|
|
||||||
mod = f"{base_mod or 'test'}.{mod}"
|
|
||||||
tests.extend(findtests(testdir=subdir, exclude=exclude,
|
|
||||||
split_test_dirs=split_test_dirs,
|
|
||||||
base_mod=mod))
|
|
||||||
elif ext in (".py", ""):
|
|
||||||
tests.append(f"{base_mod}.{mod}" if base_mod else mod)
|
|
||||||
return sorted(tests)
|
|
||||||
|
|
||||||
|
|
||||||
def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
|
|
||||||
split_test_dirs=SPLITTESTDIRS):
|
|
||||||
testdir = findtestdir(testdir)
|
|
||||||
splitted = []
|
|
||||||
for name in tests:
|
|
||||||
if name in split_test_dirs:
|
|
||||||
subdir = os.path.join(testdir, name)
|
|
||||||
splitted.extend(findtests(testdir=subdir, exclude=exclude,
|
|
||||||
split_test_dirs=split_test_dirs,
|
|
||||||
base_mod=name))
|
|
||||||
else:
|
|
||||||
splitted.append(name)
|
|
||||||
return splitted
|
|
||||||
|
|
||||||
|
|
||||||
def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
|
|
||||||
if test_name.startswith('test.') or test_dir:
|
|
||||||
return test_name
|
|
||||||
else:
|
|
||||||
# Import it from the test package
|
|
||||||
return 'test.' + test_name
|
|
||||||
|
|
||||||
|
|
||||||
def setup_support(runtests: RunTests):
|
|
||||||
support.PGO = runtests.pgo
|
|
||||||
support.PGO_EXTENDED = runtests.pgo_extended
|
|
||||||
support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
|
|
||||||
support.failfast = runtests.fail_fast
|
|
||||||
support.verbose = runtests.verbose
|
|
||||||
if runtests.use_junit:
|
|
||||||
support.junit_xml_list = []
|
|
||||||
else:
|
|
||||||
support.junit_xml_list = None
|
|
||||||
|
|
||||||
|
|
||||||
def _runtest(result: TestResult, runtests: RunTests) -> None:
|
|
||||||
# Capture stdout and stderr, set faulthandler timeout,
|
|
||||||
# and create JUnit XML report.
|
|
||||||
verbose = runtests.verbose
|
|
||||||
output_on_failure = runtests.output_on_failure
|
|
||||||
timeout = runtests.timeout
|
|
||||||
|
|
||||||
use_timeout = (
|
|
||||||
timeout is not None and threading_helper.can_start_thread
|
|
||||||
)
|
|
||||||
if use_timeout:
|
|
||||||
faulthandler.dump_traceback_later(timeout, exit=True)
|
|
||||||
|
|
||||||
try:
|
|
||||||
setup_support(runtests)
|
|
||||||
|
|
||||||
if output_on_failure:
|
|
||||||
support.verbose = True
|
|
||||||
|
|
||||||
stream = io.StringIO()
|
|
||||||
orig_stdout = sys.stdout
|
|
||||||
orig_stderr = sys.stderr
|
|
||||||
print_warning = support.print_warning
|
|
||||||
orig_print_warnings_stderr = print_warning.orig_stderr
|
|
||||||
|
|
||||||
output = None
|
|
||||||
try:
|
|
||||||
sys.stdout = stream
|
|
||||||
sys.stderr = stream
|
|
||||||
# print_warning() writes into the temporary stream to preserve
|
|
||||||
# messages order. If support.environment_altered becomes true,
|
|
||||||
# warnings will be written to sys.stderr below.
|
|
||||||
print_warning.orig_stderr = stream
|
|
||||||
|
|
||||||
_runtest_env_changed_exc(result, runtests, display_failure=False)
|
|
||||||
# Ignore output if the test passed successfully
|
|
||||||
if result.state != State.PASSED:
|
|
||||||
output = stream.getvalue()
|
|
||||||
finally:
|
|
||||||
sys.stdout = orig_stdout
|
|
||||||
sys.stderr = orig_stderr
|
|
||||||
print_warning.orig_stderr = orig_print_warnings_stderr
|
|
||||||
|
|
||||||
if output is not None:
|
|
||||||
sys.stderr.write(output)
|
|
||||||
sys.stderr.flush()
|
|
||||||
else:
|
|
||||||
# Tell tests to be moderately quiet
|
|
||||||
support.verbose = verbose
|
|
||||||
_runtest_env_changed_exc(result, runtests,
|
|
||||||
display_failure=not verbose)
|
|
||||||
|
|
||||||
xml_list = support.junit_xml_list
|
|
||||||
if xml_list:
|
|
||||||
import xml.etree.ElementTree as ET
|
|
||||||
result.xml_data = [ET.tostring(x).decode('us-ascii')
|
|
||||||
for x in xml_list]
|
|
||||||
finally:
|
|
||||||
if use_timeout:
|
|
||||||
faulthandler.cancel_dump_traceback_later()
|
|
||||||
support.junit_xml_list = None
|
|
||||||
|
|
||||||
|
|
||||||
def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
|
|
||||||
"""Run a single test.
|
|
||||||
|
|
||||||
test_name -- the name of the test
|
|
||||||
|
|
||||||
Returns a TestResult.
|
|
||||||
|
|
||||||
If runtests.use_junit, xml_data is a list containing each generated
|
|
||||||
testsuite element.
|
|
||||||
"""
|
|
||||||
start_time = time.perf_counter()
|
|
||||||
result = TestResult(test_name)
|
|
||||||
pgo = runtests.pgo
|
|
||||||
try:
|
|
||||||
_runtest(result, runtests)
|
|
||||||
except:
|
|
||||||
if not pgo:
|
|
||||||
msg = traceback.format_exc()
|
|
||||||
print(f"test {test_name} crashed -- {msg}",
|
|
||||||
file=sys.stderr, flush=True)
|
|
||||||
result.state = State.UNCAUGHT_EXC
|
|
||||||
result.duration = time.perf_counter() - start_time
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def run_unittest(test_mod):
|
|
||||||
loader = unittest.TestLoader()
|
|
||||||
tests = loader.loadTestsFromModule(test_mod)
|
|
||||||
for error in loader.errors:
|
|
||||||
print(error, file=sys.stderr)
|
|
||||||
if loader.errors:
|
|
||||||
raise Exception("errors while loading tests")
|
|
||||||
return support.run_unittest(tests)
|
|
||||||
|
|
||||||
|
|
||||||
def save_env(test_name: TestName, runtests: RunTests):
|
|
||||||
return saved_test_environment(test_name, runtests.verbose, runtests.quiet,
|
|
||||||
pgo=runtests.pgo)
|
|
||||||
|
|
||||||
|
|
||||||
def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
|
|
||||||
# Run test_func(), collect statistics, and detect reference and memory
|
|
||||||
# leaks.
|
|
||||||
if runtests.hunt_refleak:
|
|
||||||
from test.libregrtest.refleak import runtest_refleak
|
|
||||||
refleak, test_result = runtest_refleak(result.test_name, test_func,
|
|
||||||
runtests.hunt_refleak,
|
|
||||||
runtests.quiet)
|
|
||||||
else:
|
|
||||||
test_result = test_func()
|
|
||||||
refleak = False
|
|
||||||
|
|
||||||
if refleak:
|
|
||||||
result.state = State.REFLEAK
|
|
||||||
|
|
||||||
match test_result:
|
|
||||||
case TestStats():
|
|
||||||
stats = test_result
|
|
||||||
case unittest.TestResult():
|
|
||||||
stats = TestStats.from_unittest(test_result)
|
|
||||||
case doctest.TestResults():
|
|
||||||
stats = TestStats.from_doctest(test_result)
|
|
||||||
case None:
|
|
||||||
print_warning(f"{result.test_name} test runner returned None: {test_func}")
|
|
||||||
stats = None
|
|
||||||
case _:
|
|
||||||
print_warning(f"Unknown test result type: {type(test_result)}")
|
|
||||||
stats = None
|
|
||||||
|
|
||||||
result.stats = stats
|
|
||||||
|
|
||||||
|
|
||||||
# Storage of uncollectable objects
|
|
||||||
FOUND_GARBAGE = []
|
|
||||||
|
|
||||||
|
|
||||||
def _load_run_test(result: TestResult, runtests: RunTests) -> None:
|
|
||||||
# Load the test function, run the test function.
|
|
||||||
module_name = abs_module_name(result.test_name, runtests.test_dir)
|
|
||||||
|
|
||||||
# Remove the module from sys.module to reload it if it was already imported
|
|
||||||
sys.modules.pop(module_name, None)
|
|
||||||
|
|
||||||
test_mod = importlib.import_module(module_name)
|
|
||||||
|
|
||||||
if hasattr(test_mod, "test_main"):
|
|
||||||
# https://github.com/python/cpython/issues/89392
|
|
||||||
raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
|
|
||||||
def test_func():
|
|
||||||
return run_unittest(test_mod)
|
|
||||||
|
|
||||||
try:
|
|
||||||
with save_env(result.test_name, runtests):
|
|
||||||
regrtest_runner(result, test_func, runtests)
|
|
||||||
finally:
|
|
||||||
# First kill any dangling references to open files etc.
|
|
||||||
# This can also issue some ResourceWarnings which would otherwise get
|
|
||||||
# triggered during the following test run, and possibly produce
|
|
||||||
# failures.
|
|
||||||
support.gc_collect()
|
|
||||||
|
|
||||||
remove_testfn(result.test_name, runtests.verbose)
|
|
||||||
|
|
||||||
if gc.garbage:
|
|
||||||
support.environment_altered = True
|
|
||||||
print_warning(f"{result.test_name} created {len(gc.garbage)} "
|
|
||||||
f"uncollectable object(s)")
|
|
||||||
|
|
||||||
# move the uncollectable objects somewhere,
|
|
||||||
# so we don't see them again
|
|
||||||
FOUND_GARBAGE.extend(gc.garbage)
|
|
||||||
gc.garbage.clear()
|
|
||||||
|
|
||||||
support.reap_children()
|
|
||||||
|
|
||||||
|
|
||||||
def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
|
|
||||||
display_failure: bool = True) -> None:
|
|
||||||
# Detect environment changes, handle exceptions.
|
|
||||||
|
|
||||||
# Reset the environment_altered flag to detect if a test altered
|
|
||||||
# the environment
|
|
||||||
support.environment_altered = False
|
|
||||||
|
|
||||||
pgo = runtests.pgo
|
|
||||||
if pgo:
|
|
||||||
display_failure = False
|
|
||||||
quiet = runtests.quiet
|
|
||||||
|
|
||||||
test_name = result.test_name
|
|
||||||
try:
|
|
||||||
clear_caches()
|
|
||||||
support.gc_collect()
|
|
||||||
|
|
||||||
with save_env(test_name, runtests):
|
|
||||||
_load_run_test(result, runtests)
|
|
||||||
except support.ResourceDenied as msg:
|
|
||||||
if not quiet and not pgo:
|
|
||||||
print(f"{test_name} skipped -- {msg}", flush=True)
|
|
||||||
result.state = State.RESOURCE_DENIED
|
|
||||||
return
|
|
||||||
except unittest.SkipTest as msg:
|
|
||||||
if not quiet and not pgo:
|
|
||||||
print(f"{test_name} skipped -- {msg}", flush=True)
|
|
||||||
result.state = State.SKIPPED
|
|
||||||
return
|
|
||||||
except support.TestFailedWithDetails as exc:
|
|
||||||
msg = f"test {test_name} failed"
|
|
||||||
if display_failure:
|
|
||||||
msg = f"{msg} -- {exc}"
|
|
||||||
print(msg, file=sys.stderr, flush=True)
|
|
||||||
result.state = State.FAILED
|
|
||||||
result.errors = exc.errors
|
|
||||||
result.failures = exc.failures
|
|
||||||
result.stats = exc.stats
|
|
||||||
return
|
|
||||||
except support.TestFailed as exc:
|
|
||||||
msg = f"test {test_name} failed"
|
|
||||||
if display_failure:
|
|
||||||
msg = f"{msg} -- {exc}"
|
|
||||||
print(msg, file=sys.stderr, flush=True)
|
|
||||||
result.state = State.FAILED
|
|
||||||
result.stats = exc.stats
|
|
||||||
return
|
|
||||||
except support.TestDidNotRun:
|
|
||||||
result.state = State.DID_NOT_RUN
|
|
||||||
return
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print()
|
|
||||||
result.state = State.INTERRUPTED
|
|
||||||
return
|
|
||||||
except:
|
|
||||||
if not pgo:
|
|
||||||
msg = traceback.format_exc()
|
|
||||||
print(f"test {test_name} crashed -- {msg}",
|
|
||||||
file=sys.stderr, flush=True)
|
|
||||||
result.state = State.UNCAUGHT_EXC
|
|
||||||
return
|
|
||||||
|
|
||||||
if support.environment_altered:
|
|
||||||
result.set_env_changed()
|
|
||||||
# Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
|
|
||||||
if result.state is None:
|
|
||||||
result.state = State.PASSED
|
|
||||||
|
|
||||||
|
|
||||||
def remove_testfn(test_name: TestName, verbose: int) -> None:
|
|
||||||
# Try to clean up os_helper.TESTFN if left behind.
|
|
||||||
#
|
|
||||||
# While tests shouldn't leave any files or directories behind, when a test
|
|
||||||
# fails that can be tedious for it to arrange. The consequences can be
|
|
||||||
# especially nasty on Windows, since if a test leaves a file open, it
|
|
||||||
# cannot be deleted by name (while there's nothing we can do about that
|
|
||||||
# here either, we can display the name of the offending test, which is a
|
|
||||||
# real help).
|
|
||||||
name = os_helper.TESTFN
|
|
||||||
if not os.path.exists(name):
|
|
||||||
return
|
|
||||||
|
|
||||||
if os.path.isdir(name):
|
|
||||||
import shutil
|
|
||||||
kind, nuker = "directory", shutil.rmtree
|
|
||||||
elif os.path.isfile(name):
|
|
||||||
kind, nuker = "file", os.unlink
|
|
||||||
else:
|
|
||||||
raise RuntimeError(f"os.path says {name!r} exists but is neither "
|
|
||||||
f"directory nor file")
|
|
||||||
|
|
||||||
if verbose:
|
|
||||||
print_warning(f"{test_name} left behind {kind} {name!r}")
|
|
||||||
support.environment_altered = True
|
|
||||||
|
|
||||||
try:
|
|
||||||
import stat
|
|
||||||
# fix possible permissions problems that might prevent cleanup
|
|
||||||
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
|
|
||||||
nuker(name)
|
|
||||||
except Exception as exc:
|
|
||||||
print_warning(f"{test_name} left behind {kind} {name!r} "
|
|
||||||
f"and it couldn't be removed: {exc}")
|
|
@ -15,12 +15,13 @@ from test import support
|
|||||||
from test.support import os_helper
|
from test.support import os_helper
|
||||||
|
|
||||||
from test.libregrtest.main import Regrtest
|
from test.libregrtest.main import Regrtest
|
||||||
from test.libregrtest.runtest import (
|
from test.libregrtest.result import TestResult, State
|
||||||
TestResult, State, PROGRESS_MIN_TIME,
|
|
||||||
RunTests, TestName)
|
|
||||||
from test.libregrtest.results import TestResults
|
from test.libregrtest.results import TestResults
|
||||||
|
from test.libregrtest.runtests import RunTests
|
||||||
|
from test.libregrtest.single import PROGRESS_MIN_TIME
|
||||||
from test.libregrtest.utils import (
|
from test.libregrtest.utils import (
|
||||||
format_duration, print_warning, StrPath)
|
StrPath, TestName,
|
||||||
|
format_duration, print_warning)
|
||||||
from test.libregrtest.worker import create_worker_process, USE_PROCESS_GROUP
|
from test.libregrtest.worker import create_worker_process, USE_PROCESS_GROUP
|
||||||
|
|
||||||
if sys.platform == 'win32':
|
if sys.platform == 'win32':
|
||||||
|
83
Lib/test/libregrtest/runtests.py
Normal file
83
Lib/test/libregrtest/runtests.py
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
import dataclasses
|
||||||
|
import json
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from test.libregrtest.utils import (
|
||||||
|
StrPath, StrJSON, TestTuple, FilterTuple, FilterDict)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True, frozen=True)
|
||||||
|
class HuntRefleak:
|
||||||
|
warmups: int
|
||||||
|
runs: int
|
||||||
|
filename: StrPath
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(slots=True, frozen=True)
|
||||||
|
class RunTests:
|
||||||
|
tests: TestTuple
|
||||||
|
fail_fast: bool = False
|
||||||
|
fail_env_changed: bool = False
|
||||||
|
match_tests: FilterTuple | None = None
|
||||||
|
ignore_tests: FilterTuple | None = None
|
||||||
|
match_tests_dict: FilterDict | None = None
|
||||||
|
rerun: bool = False
|
||||||
|
forever: bool = False
|
||||||
|
pgo: bool = False
|
||||||
|
pgo_extended: bool = False
|
||||||
|
output_on_failure: bool = False
|
||||||
|
timeout: float | None = None
|
||||||
|
verbose: bool = False
|
||||||
|
quiet: bool = False
|
||||||
|
hunt_refleak: HuntRefleak | None = None
|
||||||
|
test_dir: StrPath | None = None
|
||||||
|
use_junit: bool = False
|
||||||
|
memory_limit: str | None = None
|
||||||
|
gc_threshold: int | None = None
|
||||||
|
use_resources: list[str] = None
|
||||||
|
python_cmd: list[str] | None = None
|
||||||
|
|
||||||
|
def copy(self, **override):
|
||||||
|
state = dataclasses.asdict(self)
|
||||||
|
state.update(override)
|
||||||
|
return RunTests(**state)
|
||||||
|
|
||||||
|
def get_match_tests(self, test_name) -> FilterTuple | None:
|
||||||
|
if self.match_tests_dict is not None:
|
||||||
|
return self.match_tests_dict.get(test_name, None)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def iter_tests(self):
|
||||||
|
if self.forever:
|
||||||
|
while True:
|
||||||
|
yield from self.tests
|
||||||
|
else:
|
||||||
|
yield from self.tests
|
||||||
|
|
||||||
|
def as_json(self) -> StrJSON:
|
||||||
|
return json.dumps(self, cls=_EncodeRunTests)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_json(worker_json: StrJSON) -> 'RunTests':
|
||||||
|
return json.loads(worker_json, object_hook=_decode_runtests)
|
||||||
|
|
||||||
|
|
||||||
|
class _EncodeRunTests(json.JSONEncoder):
|
||||||
|
def default(self, o: Any) -> dict[str, Any]:
|
||||||
|
if isinstance(o, RunTests):
|
||||||
|
result = dataclasses.asdict(o)
|
||||||
|
result["__runtests__"] = True
|
||||||
|
return result
|
||||||
|
else:
|
||||||
|
return super().default(o)
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_runtests(data: dict[str, Any]) -> RunTests | dict[str, Any]:
|
||||||
|
if "__runtests__" in data:
|
||||||
|
data.pop('__runtests__')
|
||||||
|
if data['hunt_refleak']:
|
||||||
|
data['hunt_refleak'] = HuntRefleak(**data['hunt_refleak'])
|
||||||
|
return RunTests(**data)
|
||||||
|
else:
|
||||||
|
return data
|
@ -11,6 +11,7 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
gc = None
|
gc = None
|
||||||
|
|
||||||
|
from test.libregrtest.runtests import RunTests
|
||||||
from test.libregrtest.utils import (
|
from test.libregrtest.utils import (
|
||||||
setup_unraisable_hook, setup_threading_excepthook, fix_umask)
|
setup_unraisable_hook, setup_threading_excepthook, fix_umask)
|
||||||
|
|
||||||
@ -25,6 +26,18 @@ def setup_test_dir(testdir: str | None) -> None:
|
|||||||
sys.path.insert(0, os.path.abspath(testdir))
|
sys.path.insert(0, os.path.abspath(testdir))
|
||||||
|
|
||||||
|
|
||||||
|
def setup_support(runtests: RunTests):
|
||||||
|
support.PGO = runtests.pgo
|
||||||
|
support.PGO_EXTENDED = runtests.pgo_extended
|
||||||
|
support.set_match_tests(runtests.match_tests, runtests.ignore_tests)
|
||||||
|
support.failfast = runtests.fail_fast
|
||||||
|
support.verbose = runtests.verbose
|
||||||
|
if runtests.use_junit:
|
||||||
|
support.junit_xml_list = []
|
||||||
|
else:
|
||||||
|
support.junit_xml_list = None
|
||||||
|
|
||||||
|
|
||||||
def setup_tests(runtests):
|
def setup_tests(runtests):
|
||||||
fix_umask()
|
fix_umask()
|
||||||
|
|
||||||
|
275
Lib/test/libregrtest/single.py
Normal file
275
Lib/test/libregrtest/single.py
Normal file
@ -0,0 +1,275 @@
|
|||||||
|
import doctest
|
||||||
|
import faulthandler
|
||||||
|
import gc
|
||||||
|
import importlib
|
||||||
|
import io
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from test import support
|
||||||
|
from test.support import TestStats
|
||||||
|
from test.support import threading_helper
|
||||||
|
|
||||||
|
from test.libregrtest.result import State, TestResult
|
||||||
|
from test.libregrtest.runtests import RunTests
|
||||||
|
from test.libregrtest.save_env import saved_test_environment
|
||||||
|
from test.libregrtest.setup import setup_support
|
||||||
|
from test.libregrtest.utils import (
|
||||||
|
TestName,
|
||||||
|
clear_caches, remove_testfn, abs_module_name, print_warning)
|
||||||
|
|
||||||
|
|
||||||
|
# Minimum duration of a test to display its duration or to mention that
|
||||||
|
# the test is running in background
|
||||||
|
PROGRESS_MIN_TIME = 30.0 # seconds
|
||||||
|
|
||||||
|
|
||||||
|
def run_unittest(test_mod):
|
||||||
|
loader = unittest.TestLoader()
|
||||||
|
tests = loader.loadTestsFromModule(test_mod)
|
||||||
|
for error in loader.errors:
|
||||||
|
print(error, file=sys.stderr)
|
||||||
|
if loader.errors:
|
||||||
|
raise Exception("errors while loading tests")
|
||||||
|
return support.run_unittest(tests)
|
||||||
|
|
||||||
|
|
||||||
|
def regrtest_runner(result: TestResult, test_func, runtests: RunTests) -> None:
|
||||||
|
# Run test_func(), collect statistics, and detect reference and memory
|
||||||
|
# leaks.
|
||||||
|
if runtests.hunt_refleak:
|
||||||
|
from test.libregrtest.refleak import runtest_refleak
|
||||||
|
refleak, test_result = runtest_refleak(result.test_name, test_func,
|
||||||
|
runtests.hunt_refleak,
|
||||||
|
runtests.quiet)
|
||||||
|
else:
|
||||||
|
test_result = test_func()
|
||||||
|
refleak = False
|
||||||
|
|
||||||
|
if refleak:
|
||||||
|
result.state = State.REFLEAK
|
||||||
|
|
||||||
|
match test_result:
|
||||||
|
case TestStats():
|
||||||
|
stats = test_result
|
||||||
|
case unittest.TestResult():
|
||||||
|
stats = TestStats.from_unittest(test_result)
|
||||||
|
case doctest.TestResults():
|
||||||
|
stats = TestStats.from_doctest(test_result)
|
||||||
|
case None:
|
||||||
|
print_warning(f"{result.test_name} test runner returned None: {test_func}")
|
||||||
|
stats = None
|
||||||
|
case _:
|
||||||
|
print_warning(f"Unknown test result type: {type(test_result)}")
|
||||||
|
stats = None
|
||||||
|
|
||||||
|
result.stats = stats
|
||||||
|
|
||||||
|
|
||||||
|
def save_env(test_name: TestName, runtests: RunTests):
|
||||||
|
return saved_test_environment(test_name, runtests.verbose, runtests.quiet,
|
||||||
|
pgo=runtests.pgo)
|
||||||
|
|
||||||
|
|
||||||
|
# Storage of uncollectable GC objects (gc.garbage)
|
||||||
|
GC_GARBAGE = []
|
||||||
|
|
||||||
|
|
||||||
|
def _load_run_test(result: TestResult, runtests: RunTests) -> None:
|
||||||
|
# Load the test function, run the test function.
|
||||||
|
module_name = abs_module_name(result.test_name, runtests.test_dir)
|
||||||
|
|
||||||
|
# Remove the module from sys.module to reload it if it was already imported
|
||||||
|
sys.modules.pop(module_name, None)
|
||||||
|
|
||||||
|
test_mod = importlib.import_module(module_name)
|
||||||
|
|
||||||
|
if hasattr(test_mod, "test_main"):
|
||||||
|
# https://github.com/python/cpython/issues/89392
|
||||||
|
raise Exception(f"Module {result.test_name} defines test_main() which is no longer supported by regrtest")
|
||||||
|
def test_func():
|
||||||
|
return run_unittest(test_mod)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with save_env(result.test_name, runtests):
|
||||||
|
regrtest_runner(result, test_func, runtests)
|
||||||
|
finally:
|
||||||
|
# First kill any dangling references to open files etc.
|
||||||
|
# This can also issue some ResourceWarnings which would otherwise get
|
||||||
|
# triggered during the following test run, and possibly produce
|
||||||
|
# failures.
|
||||||
|
support.gc_collect()
|
||||||
|
|
||||||
|
remove_testfn(result.test_name, runtests.verbose)
|
||||||
|
|
||||||
|
if gc.garbage:
|
||||||
|
support.environment_altered = True
|
||||||
|
print_warning(f"{result.test_name} created {len(gc.garbage)} "
|
||||||
|
f"uncollectable object(s)")
|
||||||
|
|
||||||
|
# move the uncollectable objects somewhere,
|
||||||
|
# so we don't see them again
|
||||||
|
GC_GARBAGE.extend(gc.garbage)
|
||||||
|
gc.garbage.clear()
|
||||||
|
|
||||||
|
support.reap_children()
|
||||||
|
|
||||||
|
|
||||||
|
def _runtest_env_changed_exc(result: TestResult, runtests: RunTests,
|
||||||
|
display_failure: bool = True) -> None:
|
||||||
|
# Detect environment changes, handle exceptions.
|
||||||
|
|
||||||
|
# Reset the environment_altered flag to detect if a test altered
|
||||||
|
# the environment
|
||||||
|
support.environment_altered = False
|
||||||
|
|
||||||
|
pgo = runtests.pgo
|
||||||
|
if pgo:
|
||||||
|
display_failure = False
|
||||||
|
quiet = runtests.quiet
|
||||||
|
|
||||||
|
test_name = result.test_name
|
||||||
|
try:
|
||||||
|
clear_caches()
|
||||||
|
support.gc_collect()
|
||||||
|
|
||||||
|
with save_env(test_name, runtests):
|
||||||
|
_load_run_test(result, runtests)
|
||||||
|
except support.ResourceDenied as msg:
|
||||||
|
if not quiet and not pgo:
|
||||||
|
print(f"{test_name} skipped -- {msg}", flush=True)
|
||||||
|
result.state = State.RESOURCE_DENIED
|
||||||
|
return
|
||||||
|
except unittest.SkipTest as msg:
|
||||||
|
if not quiet and not pgo:
|
||||||
|
print(f"{test_name} skipped -- {msg}", flush=True)
|
||||||
|
result.state = State.SKIPPED
|
||||||
|
return
|
||||||
|
except support.TestFailedWithDetails as exc:
|
||||||
|
msg = f"test {test_name} failed"
|
||||||
|
if display_failure:
|
||||||
|
msg = f"{msg} -- {exc}"
|
||||||
|
print(msg, file=sys.stderr, flush=True)
|
||||||
|
result.state = State.FAILED
|
||||||
|
result.errors = exc.errors
|
||||||
|
result.failures = exc.failures
|
||||||
|
result.stats = exc.stats
|
||||||
|
return
|
||||||
|
except support.TestFailed as exc:
|
||||||
|
msg = f"test {test_name} failed"
|
||||||
|
if display_failure:
|
||||||
|
msg = f"{msg} -- {exc}"
|
||||||
|
print(msg, file=sys.stderr, flush=True)
|
||||||
|
result.state = State.FAILED
|
||||||
|
result.stats = exc.stats
|
||||||
|
return
|
||||||
|
except support.TestDidNotRun:
|
||||||
|
result.state = State.DID_NOT_RUN
|
||||||
|
return
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print()
|
||||||
|
result.state = State.INTERRUPTED
|
||||||
|
return
|
||||||
|
except:
|
||||||
|
if not pgo:
|
||||||
|
msg = traceback.format_exc()
|
||||||
|
print(f"test {test_name} crashed -- {msg}",
|
||||||
|
file=sys.stderr, flush=True)
|
||||||
|
result.state = State.UNCAUGHT_EXC
|
||||||
|
return
|
||||||
|
|
||||||
|
if support.environment_altered:
|
||||||
|
result.set_env_changed()
|
||||||
|
# Don't override the state if it was already set (REFLEAK or ENV_CHANGED)
|
||||||
|
if result.state is None:
|
||||||
|
result.state = State.PASSED
|
||||||
|
|
||||||
|
|
||||||
|
def _runtest(result: TestResult, runtests: RunTests) -> None:
|
||||||
|
# Capture stdout and stderr, set faulthandler timeout,
|
||||||
|
# and create JUnit XML report.
|
||||||
|
verbose = runtests.verbose
|
||||||
|
output_on_failure = runtests.output_on_failure
|
||||||
|
timeout = runtests.timeout
|
||||||
|
|
||||||
|
use_timeout = (
|
||||||
|
timeout is not None and threading_helper.can_start_thread
|
||||||
|
)
|
||||||
|
if use_timeout:
|
||||||
|
faulthandler.dump_traceback_later(timeout, exit=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
setup_support(runtests)
|
||||||
|
|
||||||
|
if output_on_failure:
|
||||||
|
support.verbose = True
|
||||||
|
|
||||||
|
stream = io.StringIO()
|
||||||
|
orig_stdout = sys.stdout
|
||||||
|
orig_stderr = sys.stderr
|
||||||
|
print_warning = support.print_warning
|
||||||
|
orig_print_warnings_stderr = print_warning.orig_stderr
|
||||||
|
|
||||||
|
output = None
|
||||||
|
try:
|
||||||
|
sys.stdout = stream
|
||||||
|
sys.stderr = stream
|
||||||
|
# print_warning() writes into the temporary stream to preserve
|
||||||
|
# messages order. If support.environment_altered becomes true,
|
||||||
|
# warnings will be written to sys.stderr below.
|
||||||
|
print_warning.orig_stderr = stream
|
||||||
|
|
||||||
|
_runtest_env_changed_exc(result, runtests, display_failure=False)
|
||||||
|
# Ignore output if the test passed successfully
|
||||||
|
if result.state != State.PASSED:
|
||||||
|
output = stream.getvalue()
|
||||||
|
finally:
|
||||||
|
sys.stdout = orig_stdout
|
||||||
|
sys.stderr = orig_stderr
|
||||||
|
print_warning.orig_stderr = orig_print_warnings_stderr
|
||||||
|
|
||||||
|
if output is not None:
|
||||||
|
sys.stderr.write(output)
|
||||||
|
sys.stderr.flush()
|
||||||
|
else:
|
||||||
|
# Tell tests to be moderately quiet
|
||||||
|
support.verbose = verbose
|
||||||
|
_runtest_env_changed_exc(result, runtests,
|
||||||
|
display_failure=not verbose)
|
||||||
|
|
||||||
|
xml_list = support.junit_xml_list
|
||||||
|
if xml_list:
|
||||||
|
import xml.etree.ElementTree as ET
|
||||||
|
result.xml_data = [ET.tostring(x).decode('us-ascii')
|
||||||
|
for x in xml_list]
|
||||||
|
finally:
|
||||||
|
if use_timeout:
|
||||||
|
faulthandler.cancel_dump_traceback_later()
|
||||||
|
support.junit_xml_list = None
|
||||||
|
|
||||||
|
|
||||||
|
def run_single_test(test_name: TestName, runtests: RunTests) -> TestResult:
|
||||||
|
"""Run a single test.
|
||||||
|
|
||||||
|
test_name -- the name of the test
|
||||||
|
|
||||||
|
Returns a TestResult.
|
||||||
|
|
||||||
|
If runtests.use_junit, xml_data is a list containing each generated
|
||||||
|
testsuite element.
|
||||||
|
"""
|
||||||
|
start_time = time.perf_counter()
|
||||||
|
result = TestResult(test_name)
|
||||||
|
pgo = runtests.pgo
|
||||||
|
try:
|
||||||
|
_runtest(result, runtests)
|
||||||
|
except:
|
||||||
|
if not pgo:
|
||||||
|
msg = traceback.format_exc()
|
||||||
|
print(f"test {test_name} crashed -- {msg}",
|
||||||
|
file=sys.stderr, flush=True)
|
||||||
|
result.state = State.UNCAUGHT_EXC
|
||||||
|
result.duration = time.perf_counter() - start_time
|
||||||
|
return result
|
@ -21,7 +21,16 @@ MS_WINDOWS = (sys.platform == 'win32')
|
|||||||
EXIT_TIMEOUT = 120.0
|
EXIT_TIMEOUT = 120.0
|
||||||
|
|
||||||
|
|
||||||
|
# Types for types hints
|
||||||
StrPath = str
|
StrPath = str
|
||||||
|
TestName = str
|
||||||
|
StrJSON = str
|
||||||
|
TestTuple = tuple[TestName, ...]
|
||||||
|
TestList = list[TestName]
|
||||||
|
# --match and --ignore options: list of patterns
|
||||||
|
# ('*' joker character can be used)
|
||||||
|
FilterTuple = tuple[TestName, ...]
|
||||||
|
FilterDict = dict[TestName, FilterTuple]
|
||||||
|
|
||||||
|
|
||||||
def format_duration(seconds):
|
def format_duration(seconds):
|
||||||
@ -389,3 +398,76 @@ def exit_timeout():
|
|||||||
if threading_helper.can_start_thread:
|
if threading_helper.can_start_thread:
|
||||||
faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
|
faulthandler.dump_traceback_later(EXIT_TIMEOUT, exit=True)
|
||||||
sys.exit(exc.code)
|
sys.exit(exc.code)
|
||||||
|
|
||||||
|
|
||||||
|
def remove_testfn(test_name: TestName, verbose: int) -> None:
|
||||||
|
# Try to clean up os_helper.TESTFN if left behind.
|
||||||
|
#
|
||||||
|
# While tests shouldn't leave any files or directories behind, when a test
|
||||||
|
# fails that can be tedious for it to arrange. The consequences can be
|
||||||
|
# especially nasty on Windows, since if a test leaves a file open, it
|
||||||
|
# cannot be deleted by name (while there's nothing we can do about that
|
||||||
|
# here either, we can display the name of the offending test, which is a
|
||||||
|
# real help).
|
||||||
|
name = os_helper.TESTFN
|
||||||
|
if not os.path.exists(name):
|
||||||
|
return
|
||||||
|
|
||||||
|
if os.path.isdir(name):
|
||||||
|
import shutil
|
||||||
|
kind, nuker = "directory", shutil.rmtree
|
||||||
|
elif os.path.isfile(name):
|
||||||
|
kind, nuker = "file", os.unlink
|
||||||
|
else:
|
||||||
|
raise RuntimeError(f"os.path says {name!r} exists but is neither "
|
||||||
|
f"directory nor file")
|
||||||
|
|
||||||
|
if verbose:
|
||||||
|
print_warning(f"{test_name} left behind {kind} {name!r}")
|
||||||
|
support.environment_altered = True
|
||||||
|
|
||||||
|
try:
|
||||||
|
import stat
|
||||||
|
# fix possible permissions problems that might prevent cleanup
|
||||||
|
os.chmod(name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
|
||||||
|
nuker(name)
|
||||||
|
except Exception as exc:
|
||||||
|
print_warning(f"{test_name} left behind {kind} {name!r} "
|
||||||
|
f"and it couldn't be removed: {exc}")
|
||||||
|
|
||||||
|
|
||||||
|
def abs_module_name(test_name: TestName, test_dir: StrPath | None) -> TestName:
|
||||||
|
if test_name.startswith('test.') or test_dir:
|
||||||
|
return test_name
|
||||||
|
else:
|
||||||
|
# Import it from the test package
|
||||||
|
return 'test.' + test_name
|
||||||
|
|
||||||
|
|
||||||
|
# gh-90681: When rerunning tests, we might need to rerun the whole
|
||||||
|
# class or module suite if some its life-cycle hooks fail.
|
||||||
|
# Test level hooks are not affected.
|
||||||
|
_TEST_LIFECYCLE_HOOKS = frozenset((
|
||||||
|
'setUpClass', 'tearDownClass',
|
||||||
|
'setUpModule', 'tearDownModule',
|
||||||
|
))
|
||||||
|
|
||||||
|
def normalize_test_name(test_full_name, *, is_error=False):
|
||||||
|
short_name = test_full_name.split(" ")[0]
|
||||||
|
if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
|
||||||
|
if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
|
||||||
|
# if setUpModule() or tearDownModule() failed, don't filter
|
||||||
|
# tests with the test file name, don't use use filters.
|
||||||
|
return None
|
||||||
|
|
||||||
|
# This means that we have a failure in a life-cycle hook,
|
||||||
|
# we need to rerun the whole module or class suite.
|
||||||
|
# Basically the error looks like this:
|
||||||
|
# ERROR: setUpClass (test.test_reg_ex.RegTest)
|
||||||
|
# or
|
||||||
|
# ERROR: setUpModule (test.test_reg_ex)
|
||||||
|
# So, we need to parse the class / module name.
|
||||||
|
lpar = test_full_name.index('(')
|
||||||
|
rpar = test_full_name.index(')')
|
||||||
|
return test_full_name[lpar + 1: rpar].split('.')[-1]
|
||||||
|
return short_name
|
||||||
|
@ -7,9 +7,11 @@ from test import support
|
|||||||
from test.support import os_helper
|
from test.support import os_helper
|
||||||
|
|
||||||
from test.libregrtest.setup import setup_tests, setup_test_dir
|
from test.libregrtest.setup import setup_tests, setup_test_dir
|
||||||
from test.libregrtest.runtest import (
|
from test.libregrtest.runtests import RunTests
|
||||||
run_single_test, StrJSON, FilterTuple, RunTests)
|
from test.libregrtest.single import run_single_test
|
||||||
from test.libregrtest.utils import get_work_dir, exit_timeout, StrPath
|
from test.libregrtest.utils import (
|
||||||
|
StrPath, StrJSON, FilterTuple,
|
||||||
|
get_work_dir, exit_timeout)
|
||||||
|
|
||||||
|
|
||||||
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
|
USE_PROCESS_GROUP = (hasattr(os, "setsid") and hasattr(os, "killpg"))
|
||||||
|
@ -22,7 +22,7 @@ from test import libregrtest
|
|||||||
from test import support
|
from test import support
|
||||||
from test.support import os_helper, TestStats
|
from test.support import os_helper, TestStats
|
||||||
from test.libregrtest import utils, setup
|
from test.libregrtest import utils, setup
|
||||||
from test.libregrtest.runtest import normalize_test_name
|
from test.libregrtest.utils import normalize_test_name
|
||||||
|
|
||||||
if not support.has_subprocess_support:
|
if not support.has_subprocess_support:
|
||||||
raise unittest.SkipTest("test module requires subprocess")
|
raise unittest.SkipTest("test module requires subprocess")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user