""" Verifies the rules/ module boundary: nothing inside activity_core/rules/ may import from temporalio, sqlalchemy, fastapi, or any activity_core.* module outside rules/. """ import ast import os from pathlib import Path _RULES_DIR = Path(__file__).parent.parent.parent / "src" / "activity_core" / "rules" _FORBIDDEN_MODULES = { "temporalio", "sqlalchemy", "fastapi", } def _get_imports(filepath: Path) -> list[str]: """Return all top-level imported module names from a Python file.""" tree = ast.parse(filepath.read_text()) imports = [] for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: imports.append(alias.name.split(".")[0]) elif isinstance(node, ast.ImportFrom): if node.module: root = node.module.split(".")[0] imports.append(root) # Detect cross-boundary activity_core imports if node.module.startswith("activity_core.") and not node.module.startswith( "activity_core.rules" ): imports.append(f"_cross_boundary:{node.module}") return imports def test_rules_module_boundary() -> None: """No file in rules/ may import forbidden modules or cross the boundary.""" violations: list[str] = [] for py_file in sorted(_RULES_DIR.glob("*.py")): imports = _get_imports(py_file) for imp in imports: if imp in _FORBIDDEN_MODULES: violations.append(f"{py_file.name}: imports {imp!r}") if imp.startswith("_cross_boundary:"): module = imp[len("_cross_boundary:"):] violations.append( f"{py_file.name}: cross-boundary import from {module!r}" ) assert not violations, ( "rules/ boundary violations:\n" + "\n".join(violations) )