from __future__ import annotations from pathlib import Path from typing import Any import yaml from jsonschema import Draft202012Validator from open_reuse.registry import ( INDEX_REQUIRED_FIELDS, integration_paths, load_index, load_schema, load_yaml, registry_paths, resolve_external_definition, resolve_repo_root, ) def _format_schema_error(path: Path, error: Any) -> str: location = ".".join(str(part) for part in error.path) or "(root)" return f"{path}: schema error at {location}: {error.message}" def validate_definition( path: Path, schema: dict[str, Any], ) -> tuple[list[str], list[str]]: errors: list[str] = [] warnings: list[str] = [] try: data = load_yaml(path) except (OSError, yaml.YAMLError, ValueError) as exc: return [f"{path}: failed to load YAML: {exc}"], warnings validator = Draft202012Validator(schema) schema_errors = sorted(validator.iter_errors(data), key=lambda item: list(item.path)) if schema_errors: errors.extend(_format_schema_error(path, item) for item in schema_errors) return errors, warnings warnings.extend(_promotion_gate_warnings(path, data)) return errors, warnings def _promotion_gate_warnings(path: Path, data: dict[str, Any]) -> list[str]: warnings: list[str] = [] status = data.get("status", "draft") maintenance = data.get("maintenance", {}) if status == "active" and not maintenance.get("maintainers"): warnings.append( f"{path}: active integration missing maintenance.maintainers" ) if status in {"registered", "active"}: if not data.get("boundary"): warnings.append(f"{path}: {status} integration missing boundary block") if not data.get("validation", {}).get("harness"): warnings.append(f"{path}: {status} integration missing validation.harness") return warnings def validate_index( repo_root: Path, *, repos_base: Path | None, indexed_only: bool = False, ) -> tuple[list[str], list[str]]: errors: list[str] = [] warnings: list[str] = [] index = load_index(repo_root) entries = index.get("integrations", []) if not isinstance(entries, list): return ["indexes/integrations.yaml: integrations must be a list"], warnings seen_ids: set[str] = set() indexed_ids: set[str] = set() for row in entries: if not isinstance(row, dict): errors.append("indexes/integrations.yaml: integration row must be a mapping") continue integration_id = row.get("id") if not integration_id: errors.append("indexes/integrations.yaml: integration row missing id") continue if integration_id in seen_ids: errors.append( f"indexes/integrations.yaml: duplicate integration id '{integration_id}'" ) seen_ids.add(integration_id) indexed_ids.add(integration_id) for field in INDEX_REQUIRED_FIELDS: if field not in row: errors.append( f"indexes/integrations.yaml: '{integration_id}' missing required field '{field}'" ) upstream = row.get("upstream") if upstream is not None and not isinstance(upstream, dict): errors.append( f"indexes/integrations.yaml: '{integration_id}' upstream must be a mapping" ) elif isinstance(upstream, dict) and "name" not in upstream: errors.append( f"indexes/integrations.yaml: '{integration_id}' upstream missing name" ) repo_slug = row.get("repo") rel_path = row.get("path") if not repo_slug or not rel_path: continue definition_path = resolve_external_definition(repo_slug, rel_path, repos_base) if definition_path is None: if repos_base is None: warnings.append( f"indexes/integrations.yaml: '{integration_id}' definition not checked " f"(pass --repos-base to verify {repo_slug}/{rel_path})" ) else: warnings.append( f"indexes/integrations.yaml: '{integration_id}' definition not found at " f"{repos_base / repo_slug / rel_path}" ) continue try: definition = load_yaml(definition_path) except (OSError, yaml.YAMLError, ValueError) as exc: errors.append( f"indexes/integrations.yaml: '{integration_id}' definition load failed: {exc}" ) continue if definition.get("id") != integration_id: errors.append( f"indexes/integrations.yaml: '{integration_id}' id mismatch in " f"{definition_path} (found '{definition.get('id')}')" ) index_mode = row.get("reuse_mode") definition_mode = definition.get("reuse", {}).get("primary_reuse_mode") if index_mode and definition_mode and index_mode != definition_mode: errors.append( f"indexes/integrations.yaml: '{integration_id}' reuse_mode '{index_mode}' " f"does not match definition '{definition_mode}'" ) if not indexed_only: local_paths = integration_paths(repo_root) for path in local_paths: try: definition = load_yaml(path) except (OSError, yaml.YAMLError, ValueError) as exc: errors.append(f"{path}: failed to load local definition: {exc}") continue integration_id = definition.get("id") if integration_id and integration_id not in indexed_ids: warnings.append( f"{path}: local definition '{integration_id}' missing index row" ) return errors, warnings def run_validate( *, root: Path | None, targets: list[Path] | None, repos_base: Path | None, fail_on_warnings: bool, check_index: bool, indexed_only: bool, ) -> int: repo_root = resolve_repo_root(root) schema = load_schema(repo_root) errors: list[str] = [] warnings: list[str] = [] definition_paths = integration_paths(repo_root, targets) if targets is None and not definition_paths and not check_index: definition_paths = [] for path in definition_paths: file_errors, file_warnings = validate_definition(path, schema) errors.extend(file_errors) warnings.extend(file_warnings) if check_index: index_errors, index_warnings = validate_index( repo_root, repos_base=repos_base, indexed_only=indexed_only, ) errors.extend(index_errors) warnings.extend(index_warnings) for warning in warnings: print(f"warning: {warning}") for error in errors: print(f"error: {error}") if errors: return 1 if fail_on_warnings and warnings: return 1 return 0