from __future__ import annotations import json import re import subprocess import tomllib from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable import yaml from .discovery import ( attribute_stable_key, discovery_stable_key, normalize_identity_part, relationship_stable_key, replacement_scope_id, short_fingerprint, source_fingerprint, ) from .llm_extraction import LLMExtractionConfig, augment_snapshot_with_llm from .loader import declaration_files, load_yaml EXTRACTOR_VERSION = "0.1.0" SKIP_DIRS = { ".git", ".hg", ".mypy_cache", ".pytest_cache", ".ruff_cache", ".venv", "__pycache__", "build", "dist", "node_modules", "target", "vendor", } COMPOSE_FILES = { "compose.yaml", "compose.yml", "docker-compose.yaml", "docker-compose.yml", } LOCKFILES = { "package-lock.json", "pnpm-lock.yaml", "yarn.lock", "poetry.lock", "pdm.lock", "uv.lock", "requirements.lock", } SERVICE_CONFIG_FILES = { "application.yaml", "application.yml", "appsettings.json", "service.yaml", "service.yml", } KUBERNETES_KINDS = { "ConfigMap", "CronJob", "DaemonSet", "Deployment", "HorizontalPodAutoscaler", "Ingress", "Job", "Namespace", "Secret", "Service", "StatefulSet", } @dataclass(frozen=True) class ScanOptions: repo_path: Path repo_slug: str | None = None repo_name: str | None = None domain: str | None = None commit: str | None = None profile: str = "deterministic" deterministic_only: bool = True llm_enabled: bool = False llm_config: LLMExtractionConfig | None = None llm_adapter: object | None = None class CandidateAccumulator: def __init__(self, repo_slug: str, domain: str | None = None) -> None: self.repo_slug = repo_slug self.domain = domain self.replacement_scopes: dict[str, dict[str, object]] = {} self.nodes: dict[str, dict[str, object]] = {} self.edges: dict[str, dict[str, object]] = {} self.attributes: dict[str, dict[str, object]] = {} def add_scope( self, *, extractor_id: str, source_kind: str, source_path: str | None = None, mode: str = "replacement", description: str | None = None, ) -> str: scope_id = replacement_scope_id( self.repo_slug, extractor_id, source_kind, source_path=source_path, ) scope = { "id": scope_id, "extractor_id": extractor_id, "source_kind": source_kind, "mode": mode, } if source_path: scope["source_path"] = source_path if description: scope["description"] = description self.replacement_scopes[scope_id] = scope return scope_id def add_node( self, *, stable_key: str, kind: str, label: str, replacement_scope: str, provenance: dict[str, object], source_anchor: dict[str, object], origin: str = "deterministic", review_state: str = "candidate", status: str = "active", confidence: float = 0.8, graph_id: str | None = None, aliases: Iterable[str] = (), attributes: dict[str, object] | None = None, lifecycle: str | None = None, domain: str | None = None, ) -> dict[str, object]: candidate: dict[str, object] = { "stable_key": stable_key, "kind": kind, "label": label, "repo": self.repo_slug, "origin": origin, "review_state": review_state, "status": status, "confidence": confidence, "replacement_scope": replacement_scope, "provenance": [provenance], "source_anchors": [source_anchor], } if graph_id: candidate["graph_id"] = graph_id if self.domain or domain: candidate["domain"] = domain or self.domain if lifecycle: candidate["lifecycle"] = lifecycle clean_aliases = _unique_strings([*aliases, label, graph_id or ""]) if clean_aliases: candidate["aliases"] = clean_aliases if attributes: candidate["attributes"] = _json_object(attributes) merged = _merge_candidate(self.nodes.get(stable_key), candidate) self.nodes[stable_key] = merged return merged def add_edge( self, *, edge_type: str, source_key: str, target_key: str, replacement_scope: str, provenance: dict[str, object], source_anchor: dict[str, object], origin: str = "deterministic", review_state: str = "candidate", status: str = "active", confidence: float = 0.8, aliases: Iterable[str] = (), attributes: dict[str, object] | None = None, ) -> dict[str, object]: stable_key = relationship_stable_key( source_key, edge_type, target_key, evidence_scope=replacement_scope, ) candidate: dict[str, object] = { "stable_key": stable_key, "edge_type": edge_type, "source_key": source_key, "target_key": target_key, "origin": origin, "review_state": review_state, "status": status, "confidence": confidence, "replacement_scope": replacement_scope, "provenance": [provenance], "source_anchors": [source_anchor], } clean_aliases = _unique_strings(aliases) if clean_aliases: candidate["aliases"] = clean_aliases if attributes: candidate["attributes"] = _json_object(attributes) merged = _merge_candidate(self.edges.get(stable_key), candidate) self.edges[stable_key] = merged return merged def add_attribute( self, *, entity_key: str, name: str, value: object, replacement_scope: str, provenance: dict[str, object], source_anchor: dict[str, object], origin: str = "deterministic", review_state: str = "candidate", confidence: float = 0.8, ) -> dict[str, object]: stable_key = attribute_stable_key(entity_key, name) candidate: dict[str, object] = { "stable_key": stable_key, "entity_key": entity_key, "name": name, "value": _json_value(value), "origin": origin, "review_state": review_state, "confidence": confidence, "replacement_scope": replacement_scope, "provenance": [provenance], "source_anchors": [source_anchor], } merged = _merge_candidate(self.attributes.get(stable_key), candidate) self.attributes[stable_key] = merged return merged def candidates(self) -> dict[str, list[dict[str, object]]]: return { "nodes": _sorted_values(self.nodes), "edges": _sorted_values(self.edges), "attributes": _sorted_values(self.attributes), } def scopes(self) -> list[dict[str, object]]: return _sorted_values(self.replacement_scopes) def scan_repo(options: ScanOptions | Path, **overrides: object) -> dict[str, object]: if isinstance(options, Path): options = ScanOptions(repo_path=options, **overrides) elif overrides: options = ScanOptions(**{**options.__dict__, **overrides}) repo_path = options.repo_path.resolve() repo_slug = normalize_identity_part(options.repo_slug or repo_path.name, fallback="repo") repo_name = options.repo_name or repo_path.name commit = options.commit or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree" now = _utc_now() accumulator = CandidateAccumulator(repo_slug=repo_slug, domain=options.domain) context = ScanContext( repo_path=repo_path, repo_slug=repo_slug, repo_name=repo_name, commit=commit, domain=options.domain, accumulator=accumulator, ) for extractor in _deterministic_extractors(): extractor(context) run_id = "scan:{repo}:{profile}:{fingerprint}".format( repo=repo_slug, profile=normalize_identity_part(options.profile), fingerprint=short_fingerprint({"commit": commit, "path": str(repo_path)}), ) snapshot = { "apiVersion": "railiance.fabric/v1alpha1", "kind": "FabricDiscoverySnapshot", "generated_at": now, "source": { "repo_slug": repo_slug, "repo_name": repo_name, "domain": options.domain or "", "commit": commit, "default_branch": _git_value(repo_path, "rev-parse", "--abbrev-ref", "HEAD") or "", "path": str(repo_path), }, "scan": { "run_id": run_id, "profile": options.profile, "deterministic_only": options.deterministic_only, "llm_enabled": options.llm_enabled, "started_at": now, "completed_at": now, }, "replacement_scopes": accumulator.scopes(), "candidates": accumulator.candidates(), "tombstones": [], "reconciliation": { "precedence": ["repo_declaration", "deterministic", "catalog", "registry", "llm", "manual"], "duplicate_policy": "stable-key matches merge automatically; alias-only matches require review", "retirement_policy": "missing candidates retire only inside their replacement scope", }, } if options.llm_enabled: return augment_snapshot_with_llm( snapshot, config=options.llm_config, adapter=options.llm_adapter, ) return snapshot @dataclass class ScanContext: repo_path: Path repo_slug: str repo_name: str commit: str domain: str | None accumulator: CandidateAccumulator @property def repository_key(self) -> str: return discovery_stable_key(self.repo_slug, "Repository", self.repo_slug) def relpath(self, path: Path) -> str: return path.resolve().relative_to(self.repo_path).as_posix() def _deterministic_extractors() -> list: return [ _extract_repo_metadata, _extract_text_metadata, _extract_fabric_declarations, _extract_python_package, _extract_node_package, _extract_lockfiles, _extract_dockerfile, _extract_compose, _extract_api_contracts, _extract_score_files, _extract_kubernetes_manifests, _extract_service_configs, ] def _extract_repo_metadata(context: ScanContext) -> None: scope = context.accumulator.add_scope( extractor_id="repo-metadata", source_kind="file", source_path=".", description="Repository-level local metadata.", ) anchor = _source_anchor("file", ".") provenance = _provenance("repo-metadata") remote_url = _git_value(context.repo_path, "config", "--get", "remote.origin.url") or "" branch = _git_value(context.repo_path, "rev-parse", "--abbrev-ref", "HEAD") or "" context.accumulator.add_node( stable_key=context.repository_key, kind="Repository", label=context.repo_name, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[context.repo_slug], attributes={ "repo_slug": context.repo_slug, "path": str(context.repo_path), "commit": context.commit, "default_branch": branch, "remote_url": remote_url, }, confidence=0.95, ) def _extract_text_metadata(context: ScanContext) -> None: files = [ ("README.md", "readme"), ("README.rst", "readme"), ("INTENT.md", "intent"), ("SCOPE.md", "scope"), ] provenance = _provenance("repo-text-metadata") for file_name, label in files: path = context.repo_path / file_name if not path.is_file(): continue scope = context.accumulator.add_scope( extractor_id="repo-text-metadata", source_kind="file", source_path=file_name, description=f"Repository {label} document.", ) anchor = _source_anchor("file", file_name, snippet=_snippet(path)) heading = _first_heading(path) or path.stem context.accumulator.add_attribute( entity_key=context.repository_key, name=f"{label}_title", value=heading, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.85, ) context.accumulator.add_attribute( entity_key=context.repository_key, name=f"{label}_present", value=True, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.95, ) def _extract_fabric_declarations(context: ScanContext) -> None: declarations: list[tuple[Path, dict[str, Any]]] = [] for path in declaration_files(context.repo_path): try: data = load_yaml(path) except Exception: continue if isinstance(data, dict): declarations.append((path, data)) keys_by_id: dict[str, str] = {} declaration_records: list[tuple[Path, dict[str, Any], str, dict[str, object], dict[str, object]]] = [] for path, data in declarations: kind = str(data.get("kind") or "") metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} spec = data.get("spec") if isinstance(data.get("spec"), dict) else {} graph_id = str(metadata.get("id") or "") if not kind or not graph_id: continue relpath = context.relpath(path) scope = context.accumulator.add_scope( extractor_id="fabric-declarations", source_kind="declaration", source_path=relpath, description="Repo-owned Fabric declaration.", ) anchor = _source_anchor("declaration", relpath, json_pointer="/metadata/id", snippet=_snippet(path)) provenance = _provenance("fabric-declarations", method="declaration", origin="repo_declaration") label = str(metadata.get("name") or graph_id) stable_key = discovery_stable_key(context.repo_slug, kind, graph_id) keys_by_id[graph_id] = stable_key context.accumulator.add_node( stable_key=stable_key, graph_id=graph_id, kind=kind, label=label, replacement_scope=scope, provenance=provenance, source_anchor=anchor, origin="repo_declaration", review_state="accepted", confidence=1.0, aliases=[graph_id, label], lifecycle=str(spec.get("lifecycle") or ""), domain=str(metadata.get("domain") or context.domain or ""), attributes={ "metadata": metadata, "spec": spec, "declaration_path": relpath, }, ) declaration_records.append((path, data, stable_key, provenance, anchor)) for path, data, source_key, provenance, anchor in declaration_records: kind = str(data.get("kind") or "") metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} spec = data.get("spec") if isinstance(data.get("spec"), dict) else {} relpath = context.relpath(path) scope = replacement_scope_id(context.repo_slug, "fabric-declarations", "declaration", source_path=relpath) graph_id = str(metadata.get("id") or "") if kind == "ServiceDeclaration": for capability_id in _string_list(spec.get("provides_capabilities")): _add_declaration_edge(context, scope, provenance, anchor, source_key, keys_by_id, capability_id, "provides") for interface_id in _string_list(spec.get("exposes_interfaces")): _add_declaration_edge(context, scope, provenance, anchor, source_key, keys_by_id, interface_id, "exposes") elif kind == "CapabilityDeclaration": for interface_id in _string_list(spec.get("interface_ids")): _add_declaration_edge(context, scope, provenance, anchor, source_key, keys_by_id, interface_id, "available_via") elif kind == "DependencyDeclaration": consumer = str(spec.get("consumer_service_id") or "") if consumer: _add_declaration_edge(context, scope, provenance, anchor, keys_by_id.get(consumer, ""), keys_by_id, graph_id, "consumes") elif kind == "BindingAssertion": dependency = str(spec.get("dependency_id") or "") provider = str(spec.get("provider_capability_id") or "") interface = str(spec.get("provider_interface_id") or "") if dependency and provider: _add_declaration_edge(context, scope, provenance, anchor, keys_by_id.get(dependency, ""), keys_by_id, provider, "binds") if dependency and interface: _add_declaration_edge(context, scope, provenance, anchor, keys_by_id.get(dependency, ""), keys_by_id, interface, "uses_interface") def _add_declaration_edge( context: ScanContext, scope: str, provenance: dict[str, object], anchor: dict[str, object], source_key: str, keys_by_id: dict[str, str], target_id: str, edge_type: str, ) -> None: target_key = keys_by_id.get(target_id) if not source_key or not target_key: return context.accumulator.add_edge( edge_type=edge_type, source_key=source_key, target_key=target_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, origin="repo_declaration", review_state="accepted", confidence=1.0, aliases=[target_id], ) def _extract_python_package(context: ScanContext) -> None: path = context.repo_path / "pyproject.toml" if not path.is_file(): return try: data = tomllib.loads(path.read_text(encoding="utf-8")) except Exception: return project = data.get("project") if not isinstance(project, dict): return name = str(project.get("name") or "").strip() if not name: return scope = context.accumulator.add_scope( extractor_id="python-package", source_kind="package_manifest", source_path="pyproject.toml", description="Python package metadata from pyproject.toml.", ) anchor = _source_anchor("package_manifest", "pyproject.toml", json_pointer="/project/name", snippet=_snippet(path)) provenance = _provenance("python-package") package_key = discovery_stable_key(context.repo_slug, "Library", name) dependencies = _string_list(project.get("dependencies")) context.accumulator.add_node( stable_key=package_key, kind="Library", label=name, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[name, normalize_identity_part(name)], attributes={ "language": "python", "package_manager": "python", "package_name": name, "version": project.get("version") or "", "description": project.get("description") or "", "dependency_count": len(dependencies), }, confidence=0.9, ) context.accumulator.add_edge( edge_type="declares_package", source_key=context.repository_key, target_key=package_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.9, ) for index, spec in enumerate(dependencies): dep_name = _python_dependency_name(spec) if not dep_name: continue dep_anchor = _source_anchor("package_manifest", "pyproject.toml", json_pointer=f"/project/dependencies/{index}") dep_key = discovery_stable_key(context.repo_slug, "ExternalLibrary", dep_name) context.accumulator.add_node( stable_key=dep_key, kind="ExternalLibrary", label=dep_name, replacement_scope=scope, provenance=provenance, source_anchor=dep_anchor, aliases=[dep_name], attributes={"ecosystem": "python", "dependency_spec": spec}, confidence=0.85, ) context.accumulator.add_edge( edge_type="depends_on_library", source_key=package_key, target_key=dep_key, replacement_scope=scope, provenance=provenance, source_anchor=dep_anchor, confidence=0.85, ) def _extract_node_package(context: ScanContext) -> None: path = context.repo_path / "package.json" if not path.is_file(): return try: data = json.loads(path.read_text(encoding="utf-8")) except Exception: return if not isinstance(data, dict): return name = str(data.get("name") or "").strip() if not name: return scope = context.accumulator.add_scope( extractor_id="node-package", source_kind="package_manifest", source_path="package.json", description="Node package metadata from package.json.", ) anchor = _source_anchor("package_manifest", "package.json", json_pointer="/name", snippet=_snippet(path)) provenance = _provenance("node-package") package_key = discovery_stable_key(context.repo_slug, "Library", name) dependencies = _node_dependencies(data) context.accumulator.add_node( stable_key=package_key, kind="Library", label=name, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[name, normalize_identity_part(name)], attributes={ "language": "javascript", "package_manager": "npm", "package_name": name, "version": data.get("version") or "", "private": bool(data.get("private", False)), "script_count": len(data.get("scripts") if isinstance(data.get("scripts"), dict) else {}), "dependency_count": len(dependencies), }, confidence=0.9, ) context.accumulator.add_edge( edge_type="declares_package", source_key=context.repository_key, target_key=package_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.9, ) for pointer, dep_name, dep_spec in dependencies: dep_anchor = _source_anchor("package_manifest", "package.json", json_pointer=pointer) dep_key = discovery_stable_key(context.repo_slug, "ExternalLibrary", dep_name) context.accumulator.add_node( stable_key=dep_key, kind="ExternalLibrary", label=dep_name, replacement_scope=scope, provenance=provenance, source_anchor=dep_anchor, aliases=[dep_name], attributes={"ecosystem": "npm", "dependency_spec": dep_spec}, confidence=0.85, ) context.accumulator.add_edge( edge_type="depends_on_library", source_key=package_key, target_key=dep_key, replacement_scope=scope, provenance=provenance, source_anchor=dep_anchor, confidence=0.85, ) def _extract_lockfiles(context: ScanContext) -> None: provenance = _provenance("lockfiles") for path in _walk_files(context.repo_path): if path.name not in LOCKFILES: continue relpath = context.relpath(path) scope = context.accumulator.add_scope( extractor_id="lockfiles", source_kind="lockfile", source_path=relpath, description="Dependency lockfile evidence.", ) anchor = _source_anchor("lockfile", relpath, snippet=_snippet(path)) lock_key = discovery_stable_key(context.repo_slug, "Lockfile", relpath) context.accumulator.add_node( stable_key=lock_key, kind="Lockfile", label=path.name, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[relpath, path.name], attributes={"path": relpath, "size_bytes": path.stat().st_size}, confidence=0.95, ) context.accumulator.add_edge( edge_type="uses_lockfile", source_key=context.repository_key, target_key=lock_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.9, ) def _extract_dockerfile(context: ScanContext) -> None: provenance = _provenance("dockerfile") for path in _walk_files(context.repo_path): if path.name != "Dockerfile" and not path.name.startswith("Dockerfile."): continue relpath = context.relpath(path) scope = context.accumulator.add_scope( extractor_id="dockerfile", source_kind="deployment_manifest", source_path=relpath, description="Container build recipe.", ) anchor = _source_anchor("deployment_manifest", relpath, snippet=_snippet(path)) build_key = discovery_stable_key(context.repo_slug, "ContainerBuild", relpath) base_images = _docker_base_images(path) context.accumulator.add_node( stable_key=build_key, kind="ContainerBuild", label=path.name, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[relpath, path.name], attributes={"path": relpath, "base_images": base_images}, confidence=0.9, ) context.accumulator.add_edge( edge_type="builds_container", source_key=context.repository_key, target_key=build_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.9, ) def _extract_compose(context: ScanContext) -> None: provenance = _provenance("docker-compose") for path in _walk_files(context.repo_path): if path.name not in COMPOSE_FILES: continue documents = _load_yaml_documents(path) if not documents or not isinstance(documents[0], dict): continue services = documents[0].get("services") if not isinstance(services, dict): continue relpath = context.relpath(path) scope = context.accumulator.add_scope( extractor_id="docker-compose", source_kind="deployment_manifest", source_path=relpath, description="Docker Compose service definitions.", ) provenance = _provenance("docker-compose") for service_name, service in sorted(services.items()): if not isinstance(service, dict): continue pointer = f"/services/{_json_pointer_escape(str(service_name))}" anchor = _source_anchor("deployment_manifest", relpath, json_pointer=pointer) deployment_key = discovery_stable_key(context.repo_slug, "DeploymentService", str(service_name), source_anchor=anchor) context.accumulator.add_node( stable_key=deployment_key, kind="DeploymentService", label=str(service_name), replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[str(service_name)], attributes={ "orchestrator": "docker-compose", "image": service.get("image") or "", "build": service.get("build") or "", "ports": service.get("ports") if isinstance(service.get("ports"), list) else [], }, confidence=0.9, ) context.accumulator.add_edge( edge_type="defines_deployment", source_key=context.repository_key, target_key=deployment_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.9, ) def _extract_api_contracts(context: ScanContext) -> None: provenance = _provenance("api-contracts") for path in _walk_files(context.repo_path): if path.suffix.lower() not in {".yaml", ".yml", ".json"}: continue data = _load_structured_file(path) if not isinstance(data, dict): continue contract_kind = "openapi" if data.get("openapi") else "asyncapi" if data.get("asyncapi") else "" if not contract_kind: continue info = data.get("info") if isinstance(data.get("info"), dict) else {} title = str(info.get("title") or path.stem) relpath = context.relpath(path) scope = context.accumulator.add_scope( extractor_id="api-contracts", source_kind="api_contract", source_path=relpath, description="OpenAPI or AsyncAPI contract.", ) anchor = _source_anchor("api_contract", relpath, json_pointer="/info/title", snippet=_snippet(path)) interface_key = discovery_stable_key(context.repo_slug, "InterfaceDeclaration", title, source_anchor=anchor) context.accumulator.add_node( stable_key=interface_key, kind="InterfaceDeclaration", label=title, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[title, relpath], attributes={ "interface_type": "http-api" if contract_kind == "openapi" else "async-api", "contract_kind": contract_kind, "contract_version": data.get(contract_kind) or "", "version": info.get("version") or "", "path": relpath, }, confidence=0.85, ) context.accumulator.add_edge( edge_type="documents_interface", source_key=context.repository_key, target_key=interface_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.8, ) def _extract_score_files(context: ScanContext) -> None: provenance = _provenance("score-files") for path in _walk_files(context.repo_path): if path.name not in {"score.yaml", "score.yml"}: continue data = _load_structured_file(path) if not isinstance(data, dict): continue metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} name = str(metadata.get("name") or data.get("name") or path.stem) relpath = context.relpath(path) scope = context.accumulator.add_scope( extractor_id="score-files", source_kind="deployment_manifest", source_path=relpath, description="Score workload specification.", ) anchor = _source_anchor("deployment_manifest", relpath, json_pointer="/metadata/name", snippet=_snippet(path)) score_key = discovery_stable_key(context.repo_slug, "ScoreWorkload", name, source_anchor=anchor) context.accumulator.add_node( stable_key=score_key, kind="ScoreWorkload", label=name, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[name, relpath], attributes={"path": relpath, "container_count": _mapping_len(data.get("containers"))}, confidence=0.85, ) context.accumulator.add_edge( edge_type="defines_workload", source_key=context.repository_key, target_key=score_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.85, ) def _extract_kubernetes_manifests(context: ScanContext) -> None: provenance = _provenance("kubernetes-manifests") for path in _walk_files(context.repo_path): if path.suffix.lower() not in {".yaml", ".yml"}: continue if _is_fabric_path(context, path): continue relpath = context.relpath(path) documents = _load_yaml_documents(path) for index, document in enumerate(documents): if not isinstance(document, dict): continue kind = str(document.get("kind") or "") if kind not in KUBERNETES_KINDS: continue metadata = document.get("metadata") if isinstance(document.get("metadata"), dict) else {} name = str(metadata.get("name") or path.stem) pointer = f"/{index}/metadata/name" if len(documents) > 1 else "/metadata/name" scope = context.accumulator.add_scope( extractor_id="kubernetes-manifests", source_kind="deployment_manifest", source_path=relpath, description="Kubernetes-style deployment manifest.", ) anchor = _source_anchor("deployment_manifest", relpath, json_pointer=pointer, snippet=_snippet(path)) node_kind = f"Kubernetes{kind}" manifest_key = discovery_stable_key(context.repo_slug, node_kind, name, source_anchor=anchor) context.accumulator.add_node( stable_key=manifest_key, kind=node_kind, label=name, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[name, relpath], attributes={ "api_version": document.get("apiVersion") or "", "manifest_kind": kind, "namespace": metadata.get("namespace") or "", "path": relpath, }, confidence=0.85, ) context.accumulator.add_edge( edge_type="defines_runtime_object", source_key=context.repository_key, target_key=manifest_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.85, ) def _extract_service_configs(context: ScanContext) -> None: provenance = _provenance("service-configs") for path in _walk_files(context.repo_path): if path.name not in SERVICE_CONFIG_FILES and not path.name.endswith(".env.example"): continue if _is_fabric_path(context, path): continue relpath = context.relpath(path) scope = context.accumulator.add_scope( extractor_id="service-configs", source_kind="service_config", source_path=relpath, description="Service configuration file.", ) anchor = _source_anchor("service_config", relpath, snippet=_snippet(path)) config_key = discovery_stable_key(context.repo_slug, "ServiceConfig", relpath) context.accumulator.add_node( stable_key=config_key, kind="ServiceConfig", label=path.name, replacement_scope=scope, provenance=provenance, source_anchor=anchor, aliases=[relpath, path.name], attributes={"path": relpath, "format": path.suffix.lstrip(".") or "env"}, confidence=0.75, ) context.accumulator.add_edge( edge_type="uses_config", source_key=context.repository_key, target_key=config_key, replacement_scope=scope, provenance=provenance, source_anchor=anchor, confidence=0.75, ) def _source_anchor( source_kind: str, path: str, *, json_pointer: str | None = None, snippet: str | None = None, ) -> dict[str, object]: anchor: dict[str, object] = {"source_kind": source_kind, "path": path} if json_pointer: anchor["json_pointer"] = json_pointer if snippet: anchor["snippet"] = snippet anchor["fingerprint"] = source_fingerprint(anchor) return anchor def _provenance( extractor_id: str, *, method: str = "deterministic", origin: str = "deterministic", ) -> dict[str, object]: return { "extractor_id": extractor_id, "extractor_version": EXTRACTOR_VERSION, "method": method, "origin": origin, } def _merge_candidate(existing: dict[str, object] | None, incoming: dict[str, object]) -> dict[str, object]: if existing is None: return incoming merged = {**existing} for field in ("aliases", "provenance", "source_anchors"): values = [*list(existing.get(field, [])), *list(incoming.get(field, []))] if values: merged[field] = _unique_json(values) if field != "aliases" else _unique_strings(values) if isinstance(existing.get("attributes"), dict) or isinstance(incoming.get("attributes"), dict): merged["attributes"] = { **(existing.get("attributes") if isinstance(existing.get("attributes"), dict) else {}), **(incoming.get("attributes") if isinstance(incoming.get("attributes"), dict) else {}), } if isinstance(existing.get("confidence"), (int, float)) and isinstance(incoming.get("confidence"), (int, float)): merged["confidence"] = max(float(existing["confidence"]), float(incoming["confidence"])) if incoming.get("review_state") == "accepted": merged["review_state"] = "accepted" if incoming.get("origin") == "repo_declaration": merged["origin"] = "repo_declaration" return merged def _sorted_values(mapping: dict[str, dict[str, object]]) -> list[dict[str, object]]: return [mapping[key] for key in sorted(mapping)] def _unique_strings(values: Iterable[object]) -> list[str]: seen: set[str] = set() result: list[str] = [] for value in values: text = str(value or "").strip() if not text or text in seen: continue seen.add(text) result.append(text) return result def _unique_json(values: Iterable[object]) -> list[object]: seen: set[str] = set() result: list[object] = [] for value in values: key = json.dumps(value, sort_keys=True, default=str) if key in seen: continue seen.add(key) result.append(value) return result def _json_object(value: dict[str, object]) -> dict[str, object]: return {str(key): _json_value(item) for key, item in value.items()} def _json_value(value: object) -> object: if value is None or isinstance(value, (str, int, float, bool)): return value if isinstance(value, list): return [_json_value(item) for item in value] if isinstance(value, tuple): return [_json_value(item) for item in value] if isinstance(value, dict): return {str(key): _json_value(item) for key, item in value.items()} return str(value) def _walk_files(repo_path: Path) -> Iterable[Path]: for path in sorted(repo_path.rglob("*")): if not path.is_file(): continue if any(part in SKIP_DIRS for part in path.relative_to(repo_path).parts): continue yield path def _is_fabric_path(context: ScanContext, path: Path) -> bool: return bool(path.resolve().relative_to(context.repo_path).parts[:1] == ("fabric",)) def _load_structured_file(path: Path) -> object: try: if path.suffix.lower() == ".json": return json.loads(path.read_text(encoding="utf-8")) documents = _load_yaml_documents(path) except Exception: return None return documents[0] if len(documents) == 1 else documents def _load_yaml_documents(path: Path) -> list[object]: try: return [document for document in yaml.safe_load_all(path.read_text(encoding="utf-8")) if document is not None] except Exception: return [] def _snippet(path: Path, *, max_chars: int = 500) -> str: try: text = path.read_text(encoding="utf-8", errors="replace") except Exception: return "" return text[:max_chars] def _first_heading(path: Path) -> str: try: for line in path.read_text(encoding="utf-8", errors="replace").splitlines(): stripped = line.strip() if stripped.startswith("#"): return stripped.lstrip("#").strip() if stripped: return stripped[:120] except Exception: return "" return "" def _string_list(value: object) -> list[str]: if not isinstance(value, list): return [] return [str(item).strip() for item in value if str(item).strip()] def _python_dependency_name(spec: str) -> str: match = re.match(r"\s*([A-Za-z0-9_.-]+)", spec) return match.group(1) if match else "" def _node_dependencies(data: dict[str, object]) -> list[tuple[str, str, str]]: dependencies: list[tuple[str, str, str]] = [] for block_name in ("dependencies", "devDependencies", "peerDependencies", "optionalDependencies"): block = data.get(block_name) if not isinstance(block, dict): continue for dep_name, dep_spec in sorted(block.items()): escaped = _json_pointer_escape(str(dep_name)) dependencies.append((f"/{block_name}/{escaped}", str(dep_name), str(dep_spec))) return dependencies def _docker_base_images(path: Path) -> list[str]: images: list[str] = [] try: lines = path.read_text(encoding="utf-8", errors="replace").splitlines() except Exception: return images for line in lines: match = re.match(r"\s*FROM\s+([^\s]+)", line, flags=re.IGNORECASE) if match: images.append(match.group(1)) return images def _mapping_len(value: object) -> int: return len(value) if isinstance(value, dict) else 0 def _json_pointer_escape(value: str) -> str: return value.replace("~", "~0").replace("/", "~1") def _git_value(repo_path: Path, *args: str) -> str | None: try: result = subprocess.run( ["git", *args], cwd=repo_path, check=False, capture_output=True, text=True, timeout=5, ) except (OSError, subprocess.TimeoutExpired): return None if result.returncode != 0: return None value = result.stdout.strip() return value or None def _utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")