from __future__ import annotations from pathlib import Path from typing import Any import yaml REQUIRED_TOP_LEVEL_FILES = ( "README.md", "INTENT.md", "SCOPE.md", "canon.yaml", "pyproject.toml", "workplans/index.yaml", "infospace/infospace.yaml", "infospace/artifacts/index.yaml", ) REQUIRED_INFOSPACE_DIRS = ( "kernel", "models", "standards", "profiles", "patterns", "mappings", "assimilation", "schemas", "views", "agent", "examples", "validation", "indexes", ) OPTIONAL_COLLECTION_DIRS = ( "profiles", "patterns", "mappings", "assimilation", "examples", ) REQUIRED_SCHEMAS = ( "standard.schema.yaml", "concept.schema.yaml", "mapping.schema.yaml", "profile.schema.yaml", "assimilation.schema.yaml", "interface-card.schema.yaml", "agent-brief.schema.yaml", "workplan.schema.yaml", ) RETRIEVAL_BRIEF_KINDS = { "capture-criteria", "concept-catalog", "conformance-pack", "consumer-workplan-brief", "evaluation-pack", "evaluation-question-set", "example", "interface-card-expectation", "kernel", "mapping", "mapping-expectation", "model", "model-extension", "pattern", "profile-alignment", "profile", "standard", "visualization-example-set", } PURPOSE_REQUIRED_ARTIFACT_IDS = { "concept-catalog/purpose-demand", "example/consumer-purpose-portfolio", "mapping/purpose-demand-governance-candidates", "model/purpose-demand-extension", "pattern/intent-scope-purposes", } PURPOSE_REQUIRED_CONCEPTS = { "Purpose", "ConsumerPurpose", "UseCase", "DemandSignal", "ConsumerNeed", "ProducerCapability", "PurposeFit", "ScopePressure", "EvolutionRequest", } PURPOSE_REQUIRED_CONSUMERS = { "user-engine", "railiance-fabric", "repo-scoping", } USER_ENGINE_EVALUATION_ARTIFACT_IDS = { "evaluation/user-engine", "evaluation/user-engine/consumer-workplan-brief", "evaluation/user-engine/interface-card-expectations", "evaluation/user-engine/questions", "evaluation/user-engine/small-saas-alignment", } USER_ENGINE_QUESTION_DOMAINS = { "access-control", "data", "governance", "organization", "purposes", "security", "task", } USER_ENGINE_REQUIRED_ENTITY_IDS = { "access-role", "account", "control", "evidence", "organization-role", "policy", "principal", "subject", "team", "tenant", "user", } USER_ENGINE_REQUIRED_EDGE_TYPES = { "assigned_role", "authenticates_as", "belongs_to_tenant", "evidenced_by", "evaluated_as", "governed_by", "implemented_by", "member_of", "scoped_to", } RAILIANCE_FABRIC_CONFORMANCE_ARTIFACT_IDS = { "conformance/railiance-fabric", "conformance/railiance-fabric/consumer-workplan-brief", "conformance/railiance-fabric/entity-edge-capture-criteria", "conformance/railiance-fabric/mapping-expectations", "conformance/railiance-fabric/visualization-examples", } RAILIANCE_FABRIC_REQUIRED_ENTITY_CATEGORIES = { "consumer-purpose", "control", "datastore", "deployment", "endpoint", "evidence", "flow", "network-zone", "pipeline", "policy", "runtime-resource", "service", "software-system", "source-repository", "task", "telemetry-signal", } RAILIANCE_FABRIC_REQUIRED_CANONICAL_EDGES = { "built_from", "creates_task", "depends_on", "deploys", "evidenced_by", "exposes", "flows_to", "governed_by", "implements", "observed_by", "part_of", "reads_or_writes", } RAILIANCE_FABRIC_REQUIRED_MODELS = { "model/data", "model/devsecops", "model/governance", "model/landscape", "model/network", "model/observability", "model/purpose-demand-extension", "model/security", } def structural_checks(context: Any) -> dict[str, list[dict[str, Any]]]: errors: list[dict[str, Any]] = [] warnings: list[dict[str, Any]] = [] _check_required_top_level_files(context.repo_root, errors) _check_required_infospace_dirs(context.infospace_root, errors) _check_required_schemas(context.infospace_root, errors) _check_canon_paths(context.repo_root, context.infospace_root, errors) _check_artifact_index(context.repo_root, context.infospace_root, errors) _check_agent_assets(context.infospace_root, context.infospace.artifacts, errors) _check_purpose_demand_assets(context.infospace_root, context.infospace.artifacts, errors) _check_user_engine_evaluation_assets( context.infospace_root, context.infospace.artifacts, errors, ) _check_railiance_fabric_conformance_assets( context.infospace_root, context.infospace.artifacts, errors, ) _check_optional_assets(context.infospace_root, warnings) return {"errors": errors, "warnings": warnings} def _check_required_top_level_files( repo_root: Path, errors: list[dict[str, Any]], ) -> None: for relative in REQUIRED_TOP_LEVEL_FILES: if not (repo_root / relative).is_file(): errors.append( { "code": "missing_required_file", "path": relative, } ) def _check_required_infospace_dirs( infospace_root: Path, errors: list[dict[str, Any]], ) -> None: for relative in REQUIRED_INFOSPACE_DIRS: if not (infospace_root / relative).is_dir(): errors.append( { "code": "missing_required_infospace_dir", "path": str(Path("infospace") / relative), } ) def _check_required_schemas( infospace_root: Path, errors: list[dict[str, Any]], ) -> None: for filename in REQUIRED_SCHEMAS: if not (infospace_root / "schemas" / filename).is_file(): errors.append( { "code": "missing_schema", "path": str(Path("infospace") / "schemas" / filename), } ) def _check_canon_paths( repo_root: Path, infospace_root: Path, errors: list[dict[str, Any]], ) -> None: canon_path = repo_root / "canon.yaml" canon = _read_yaml(canon_path, errors) if not isinstance(canon, dict): return indexed_paths = _artifact_paths_by_path(infospace_root, errors) for section in ("kernel", "models", "standards"): items = canon.get(section) or [] if not isinstance(items, list): errors.append( { "code": "invalid_canon_section", "section": section, "message": "Expected a list.", } ) continue for item in items: if not isinstance(item, dict): errors.append( { "code": "invalid_canon_entry", "section": section, "message": "Expected a mapping.", } ) continue path = str(item.get("path") or "") if not path: errors.append( { "code": "missing_canon_path", "section": section, "id": item.get("id"), } ) continue if not (repo_root / path).is_file(): errors.append( { "code": "missing_canon_path_target", "section": section, "id": item.get("id"), "path": path, } ) relative_infospace_path = _strip_infospace_prefix(path) if relative_infospace_path not in indexed_paths: errors.append( { "code": "canon_path_not_indexed", "section": section, "id": item.get("id"), "path": path, } ) def _check_artifact_index( repo_root: Path, infospace_root: Path, errors: list[dict[str, Any]], ) -> None: index_path = infospace_root / "artifacts" / "index.yaml" index = _read_yaml(index_path, errors) if not isinstance(index, dict): return artifacts = index.get("artifacts") if not isinstance(artifacts, list): errors.append( { "code": "invalid_artifact_index", "path": "infospace/artifacts/index.yaml", "message": "Expected artifacts list.", } ) return ids: set[str] = set() for artifact in artifacts: if not isinstance(artifact, dict): errors.append( { "code": "invalid_artifact_entry", "message": "Expected artifact mapping.", } ) continue artifact_id = str(artifact.get("id") or "") if not artifact_id: errors.append({"code": "missing_artifact_id"}) elif artifact_id in ids: errors.append( { "code": "duplicate_artifact_id", "artifact_id": artifact_id, } ) ids.add(artifact_id) for field in ("path", "kind", "title"): if not artifact.get(field): errors.append( { "code": "missing_artifact_field", "artifact_id": artifact_id, "field": field, } ) relative_path = str(artifact.get("path") or "") if relative_path and not (infospace_root / relative_path).is_file(): errors.append( { "code": "missing_artifact_path", "artifact_id": artifact_id, "path": relative_path, } ) provenance = artifact.get("provenance") or {} if isinstance(provenance, dict): source_path = provenance.get("source_path") if isinstance(source_path, str) and source_path: if not (repo_root / source_path).is_file(): errors.append( { "code": "missing_provenance_source", "artifact_id": artifact_id, "source_path": source_path, } ) for artifact in artifacts: if not isinstance(artifact, dict): continue artifact_id = str(artifact.get("id") or "") relationships = artifact.get("relationships") or [] if not isinstance(relationships, list): errors.append( { "code": "invalid_relationships", "artifact_id": artifact_id, "message": "Expected relationship list.", } ) continue for relationship in relationships: if not isinstance(relationship, dict): errors.append( { "code": "invalid_relationship", "artifact_id": artifact_id, } ) continue target = relationship.get("target") if target not in ids: errors.append( { "code": "missing_relationship_target", "artifact_id": artifact_id, "target": target, } ) def _check_optional_assets( infospace_root: Path, warnings: list[dict[str, Any]], ) -> None: global_brief = infospace_root / "agent" / "global-agent-brief.md" if not global_brief.is_file(): warnings.append( { "code": "missing_optional_agent_brief", "path": "infospace/agent/global-agent-brief.md", } ) concepts_dir = infospace_root / "concepts" if not concepts_dir.is_dir(): warnings.append( { "code": "missing_optional_concepts_dir", "path": "infospace/concepts", } ) for relative in OPTIONAL_COLLECTION_DIRS: directory = infospace_root / relative if directory.is_dir() and not _has_substantive_files(directory): warnings.append( { "code": "empty_optional_collection", "path": str(Path("infospace") / relative), } ) def _check_agent_assets( infospace_root: Path, artifacts: list[Any], errors: list[dict[str, Any]], ) -> None: required_files = ( "agent/global-agent-brief.md", "agent/retrieval-index.md", "agent/retrieval-index.yaml", "agent/retrieval-index.json", "agent/templates/canon-interface-card.template.yaml", "agent/templates/consumer-brief.template.md", "agent/consumer-briefs/user-engine.md", "agent/consumer-briefs/railiance-fabric.md", "agent/consumer-briefs/repo-scoping.md", ) for relative in required_files: if not (infospace_root / relative).is_file(): errors.append( { "code": "missing_agent_asset", "path": str(Path("infospace") / relative), } ) retrieval_index = _read_yaml(infospace_root / "agent" / "retrieval-index.yaml", errors) artifact_ids = {artifact.id for artifact in artifacts} if isinstance(retrieval_index, dict): items = retrieval_index.get("items") or [] if not isinstance(items, list): errors.append( { "code": "invalid_retrieval_index", "path": "infospace/agent/retrieval-index.yaml", "message": "Expected items list.", } ) else: indexed_ids = { str(item.get("id")) for item in items if isinstance(item, dict) and item.get("id") } missing = sorted(artifact_ids - indexed_ids) for artifact_id in missing: errors.append( { "code": "artifact_missing_from_retrieval_index", "artifact_id": artifact_id, } ) required_brief_artifacts = [ artifact for artifact in artifacts if artifact.kind in RETRIEVAL_BRIEF_KINDS ] for artifact in required_brief_artifacts: relative = Path("agent") / "briefs" / f"{_safe_id(artifact.id)}.md" brief_path = infospace_root / relative if not brief_path.is_file(): errors.append( { "code": "missing_agent_brief", "artifact_id": artifact.id, "path": str(Path("infospace") / relative), } ) continue frontmatter = _read_markdown_frontmatter(brief_path, errors) if frontmatter.get("artifact_id") != artifact.id: errors.append( { "code": "agent_brief_artifact_mismatch", "artifact_id": artifact.id, "path": str(Path("infospace") / relative), "value": frontmatter.get("artifact_id"), } ) if frontmatter.get("source_path") != artifact.path: errors.append( { "code": "agent_brief_source_path_mismatch", "artifact_id": artifact.id, "path": str(Path("infospace") / relative), "value": frontmatter.get("source_path"), "expected": artifact.path, } ) def _check_purpose_demand_assets( infospace_root: Path, artifacts: list[Any], errors: list[dict[str, Any]], ) -> None: artifact_ids = {artifact.id for artifact in artifacts} for artifact_id in sorted(PURPOSE_REQUIRED_ARTIFACT_IDS - artifact_ids): errors.append( { "code": "missing_purpose_demand_artifact", "artifact_id": artifact_id, } ) extension_path = ( infospace_root / "models" / "governance" / "InfoTechCanonPurposeDemandExtension.md" ) frontmatter = _read_markdown_frontmatter(extension_path, errors) owned_concepts = set(frontmatter.get("owned_concepts") or []) for concept in sorted(PURPOSE_REQUIRED_CONCEPTS - owned_concepts): errors.append( { "code": "missing_purpose_demand_owned_concept", "concept": concept, "path": str(extension_path), } ) concept_catalog = _read_yaml(infospace_root / "concepts" / "purpose-demand.yaml", errors) if isinstance(concept_catalog, dict): concepts = concept_catalog.get("concepts") or [] catalog_titles = { str(concept.get("title")) for concept in concepts if isinstance(concept, dict) and concept.get("title") } for concept in sorted(PURPOSE_REQUIRED_CONCEPTS - catalog_titles): errors.append( { "code": "missing_purpose_demand_catalog_concept", "concept": concept, "path": "infospace/concepts/purpose-demand.yaml", } ) examples = _read_yaml(infospace_root / "examples" / "consumer-purpose-portfolio.yaml", errors) if isinstance(examples, dict): consumers = examples.get("consumers") or [] consumer_ids = { str(consumer.get("id")) for consumer in consumers if isinstance(consumer, dict) and consumer.get("id") } for consumer_id in sorted(PURPOSE_REQUIRED_CONSUMERS - consumer_ids): errors.append( { "code": "missing_consumer_purpose_example", "consumer": consumer_id, "path": "infospace/examples/consumer-purpose-portfolio.yaml", } ) for consumer in consumers: if not isinstance(consumer, dict): continue if not consumer.get("purposes"): errors.append( { "code": "consumer_purpose_example_without_purposes", "consumer": consumer.get("id"), "path": "infospace/examples/consumer-purpose-portfolio.yaml", } ) def _check_user_engine_evaluation_assets( infospace_root: Path, artifacts: list[Any], errors: list[dict[str, Any]], ) -> None: artifact_ids = {artifact.id for artifact in artifacts} for artifact_id in sorted(USER_ENGINE_EVALUATION_ARTIFACT_IDS - artifact_ids): errors.append( { "code": "missing_user_engine_evaluation_artifact", "artifact_id": artifact_id, } ) pack = _read_yaml( infospace_root / "evaluations" / "user-engine" / "evaluation-pack.yaml", errors, ) if isinstance(pack, dict): components = pack.get("pack_components") or {} if not isinstance(components, dict): errors.append( { "code": "invalid_user_engine_pack_components", "path": "infospace/evaluations/user-engine/evaluation-pack.yaml", } ) else: for component in ( "questions", "interface_card_expectations", "small_saas_alignment", "consumer_workplan_brief", ): if not components.get(component): errors.append( { "code": "missing_user_engine_pack_component", "component": component, } ) questions = _read_yaml( infospace_root / "evaluations" / "user-engine" / "questions.yaml", errors, ) if isinstance(questions, dict): domains = questions.get("question_domains") or [] domain_ids = { str(domain.get("id")) for domain in domains if isinstance(domain, dict) and domain.get("id") } for domain_id in sorted(USER_ENGINE_QUESTION_DOMAINS - domain_ids): errors.append( { "code": "missing_user_engine_question_domain", "domain": domain_id, } ) for domain in domains: if isinstance(domain, dict) and not domain.get("questions"): errors.append( { "code": "empty_user_engine_question_domain", "domain": domain.get("id"), } ) expectations = _read_yaml( infospace_root / "evaluations" / "user-engine" / "interface-card-expectations.yaml", errors, ) if isinstance(expectations, dict): entity_ids = { str(entity.get("id")) for entity in expectations.get("expected_entities") or [] if isinstance(entity, dict) and entity.get("id") } for entity_id in sorted(USER_ENGINE_REQUIRED_ENTITY_IDS - entity_ids): errors.append( { "code": "missing_user_engine_expected_entity", "entity": entity_id, } ) edge_types = { str(edge.get("type")) for edge in expectations.get("expected_edges") or [] if isinstance(edge, dict) and edge.get("type") } for edge_type in sorted(USER_ENGINE_REQUIRED_EDGE_TYPES - edge_types): errors.append( { "code": "missing_user_engine_expected_edge", "edge": edge_type, } ) evidence = expectations.get("evidence_required") or [] if not isinstance(evidence, list) or not evidence: errors.append( { "code": "missing_user_engine_evidence_expectations", "path": "infospace/evaluations/user-engine/interface-card-expectations.yaml", } ) alignment = _read_yaml( infospace_root / "evaluations" / "user-engine" / "small-saas-alignment.yaml", errors, ) if isinstance(alignment, dict): if alignment.get("profile") != "profile/small-saas": errors.append( { "code": "invalid_user_engine_alignment_profile", "value": alignment.get("profile"), } ) if not alignment.get("profile_requirements"): errors.append( { "code": "missing_user_engine_profile_requirements", "path": "infospace/evaluations/user-engine/small-saas-alignment.yaml", } ) brief_path = infospace_root / "evaluations" / "user-engine" / "consumer-workplan-brief.md" if not brief_path.is_file(): errors.append( { "code": "missing_user_engine_consumer_workplan_brief", "path": "infospace/evaluations/user-engine/consumer-workplan-brief.md", } ) def _check_railiance_fabric_conformance_assets( infospace_root: Path, artifacts: list[Any], errors: list[dict[str, Any]], ) -> None: artifact_ids = {artifact.id for artifact in artifacts} for artifact_id in sorted(RAILIANCE_FABRIC_CONFORMANCE_ARTIFACT_IDS - artifact_ids): errors.append( { "code": "missing_railiance_fabric_conformance_artifact", "artifact_id": artifact_id, } ) pack = _read_yaml( infospace_root / "evaluations" / "railiance-fabric" / "conformance-pack.yaml", errors, ) if isinstance(pack, dict): components = pack.get("pack_components") or {} if not isinstance(components, dict): errors.append( { "code": "invalid_railiance_fabric_pack_components", "path": "infospace/evaluations/railiance-fabric/conformance-pack.yaml", } ) else: for component in ( "capture_criteria", "mapping_expectations", "visualization_examples", "consumer_workplan_brief", ): if not components.get(component): errors.append( { "code": "missing_railiance_fabric_pack_component", "component": component, } ) criteria = _read_yaml( infospace_root / "evaluations" / "railiance-fabric" / "entity-edge-capture-criteria.yaml", errors, ) if isinstance(criteria, dict): entity_categories = { str(entity.get("id")) for entity in criteria.get("entity_categories") or [] if isinstance(entity, dict) and entity.get("id") } for category in sorted( RAILIANCE_FABRIC_REQUIRED_ENTITY_CATEGORIES - entity_categories ): errors.append( { "code": "missing_railiance_fabric_entity_category", "category": category, } ) canonical_edges = { str(edge.get("type")) for edge in criteria.get("canonical_edge_categories") or [] if isinstance(edge, dict) and edge.get("type") } for edge_type in sorted( RAILIANCE_FABRIC_REQUIRED_CANONICAL_EDGES - canonical_edges ): errors.append( { "code": "missing_railiance_fabric_canonical_edge", "edge": edge_type, } ) display_edges = criteria.get("display_only_edge_categories") or [] if not isinstance(display_edges, list) or not display_edges: errors.append( { "code": "missing_railiance_fabric_display_edges", "path": "infospace/evaluations/railiance-fabric/entity-edge-capture-criteria.yaml", } ) mappings = _read_yaml( infospace_root / "evaluations" / "railiance-fabric" / "mapping-expectations.yaml", errors, ) if isinstance(mappings, dict): first_models = { str(model.get("id")) for model in mappings.get("first_models") or [] if isinstance(model, dict) and model.get("id") } for model_id in sorted(RAILIANCE_FABRIC_REQUIRED_MODELS - first_models): errors.append( { "code": "missing_railiance_fabric_mapping_model", "model": model_id, } ) if not mappings.get("candidate_edge_mapping"): errors.append( { "code": "missing_railiance_fabric_candidate_edge_mapping", "path": "infospace/evaluations/railiance-fabric/mapping-expectations.yaml", } ) examples = _read_yaml( infospace_root / "evaluations" / "railiance-fabric" / "visualization-examples.yaml", errors, ) if isinstance(examples, dict): example_items = examples.get("examples") or [] example_ids = { str(example.get("id")) for example in example_items if isinstance(example, dict) and example.get("id") } if "clean-service-runtime-slice" not in example_ids: errors.append( { "code": "missing_railiance_fabric_clean_visualization_example", } ) if not any(example_id.startswith("bad-shape") for example_id in example_ids): errors.append( { "code": "missing_railiance_fabric_bad_shape_example", } ) if not examples.get("visualization_rules"): errors.append( { "code": "missing_railiance_fabric_visualization_rules", "path": "infospace/evaluations/railiance-fabric/visualization-examples.yaml", } ) brief_path = ( infospace_root / "evaluations" / "railiance-fabric" / "consumer-workplan-brief.md" ) if not brief_path.is_file(): errors.append( { "code": "missing_railiance_fabric_consumer_workplan_brief", "path": "infospace/evaluations/railiance-fabric/consumer-workplan-brief.md", } ) def _artifact_paths_by_path( infospace_root: Path, errors: list[dict[str, Any]], ) -> set[str]: index = _read_yaml(infospace_root / "artifacts" / "index.yaml", errors) if not isinstance(index, dict): return set() artifacts = index.get("artifacts") or [] if not isinstance(artifacts, list): return set() return { str(artifact.get("path")) for artifact in artifacts if isinstance(artifact, dict) and artifact.get("path") } def _read_yaml(path: Path, errors: list[dict[str, Any]]) -> Any: try: with path.open("r", encoding="utf-8") as handle: return yaml.safe_load(handle) or {} except FileNotFoundError: errors.append({"code": "missing_yaml", "path": str(path)}) except yaml.YAMLError as exc: errors.append( { "code": "invalid_yaml", "path": str(path), "message": str(exc), } ) return None def _read_markdown_frontmatter(path: Path, errors: list[dict[str, Any]]) -> dict[str, Any]: try: text = path.read_text(encoding="utf-8") except FileNotFoundError: errors.append({"code": "missing_markdown", "path": str(path)}) return {} if not text.startswith("---\n"): errors.append({"code": "missing_markdown_frontmatter", "path": str(path)}) return {} end = text.find("\n---\n", 4) if end == -1: errors.append({"code": "invalid_markdown_frontmatter", "path": str(path)}) return {} try: data = yaml.safe_load(text[4:end]) or {} except yaml.YAMLError as exc: errors.append( { "code": "invalid_markdown_frontmatter_yaml", "path": str(path), "message": str(exc), } ) return {} return data if isinstance(data, dict) else {} def _strip_infospace_prefix(path: str) -> str: prefix = "infospace/" return path[len(prefix) :] if path.startswith(prefix) else path def _has_substantive_files(directory: Path) -> bool: for path in directory.rglob("*"): if path.is_file() and path.name != "README.md": return True return False def _safe_id(value: str) -> str: return value.replace("/", "-").replace("_", "-")