Files
railiance-fabric/railiance_fabric/scanner.py

1228 lines
43 KiB
Python

from __future__ import annotations
import json
import re
import subprocess
import tomllib
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable
import yaml
from .connectors import ConnectorConfig, apply_connectors
from .discovery import (
attribute_stable_key,
discovery_stable_key,
normalize_identity_part,
relationship_stable_key,
replacement_scope_id,
short_fingerprint,
source_fingerprint,
)
from .llm_extraction import LLMExtractionConfig, augment_snapshot_with_llm
from .loader import declaration_files, load_yaml
EXTRACTOR_VERSION = "0.1.0"
SKIP_DIRS = {
".git",
".hg",
".mypy_cache",
".pytest_cache",
".ruff_cache",
".venv",
"__pycache__",
"build",
"dist",
"node_modules",
"target",
"vendor",
}
COMPOSE_FILES = {
"compose.yaml",
"compose.yml",
"docker-compose.yaml",
"docker-compose.yml",
}
LOCKFILES = {
"package-lock.json",
"pnpm-lock.yaml",
"yarn.lock",
"poetry.lock",
"pdm.lock",
"uv.lock",
"requirements.lock",
}
SERVICE_CONFIG_FILES = {
"application.yaml",
"application.yml",
"appsettings.json",
"service.yaml",
"service.yml",
}
KUBERNETES_KINDS = {
"ConfigMap",
"CronJob",
"DaemonSet",
"Deployment",
"HorizontalPodAutoscaler",
"Ingress",
"Job",
"Namespace",
"Secret",
"Service",
"StatefulSet",
}
@dataclass(frozen=True)
class ScanOptions:
repo_path: Path
repo_slug: str | None = None
repo_name: str | None = None
domain: str | None = None
commit: str | None = None
profile: str = "deterministic"
deterministic_only: bool = True
llm_enabled: bool = False
llm_config: LLMExtractionConfig | None = None
llm_adapter: object | None = None
connectors: list[ConnectorConfig] = field(default_factory=list)
class CandidateAccumulator:
def __init__(self, repo_slug: str, domain: str | None = None) -> None:
self.repo_slug = repo_slug
self.domain = domain
self.replacement_scopes: dict[str, dict[str, object]] = {}
self.nodes: dict[str, dict[str, object]] = {}
self.edges: dict[str, dict[str, object]] = {}
self.attributes: dict[str, dict[str, object]] = {}
def add_scope(
self,
*,
extractor_id: str,
source_kind: str,
source_path: str | None = None,
mode: str = "replacement",
description: str | None = None,
) -> str:
scope_id = replacement_scope_id(
self.repo_slug,
extractor_id,
source_kind,
source_path=source_path,
)
scope = {
"id": scope_id,
"extractor_id": extractor_id,
"source_kind": source_kind,
"mode": mode,
}
if source_path:
scope["source_path"] = source_path
if description:
scope["description"] = description
self.replacement_scopes[scope_id] = scope
return scope_id
def add_node(
self,
*,
stable_key: str,
kind: str,
label: str,
replacement_scope: str,
provenance: dict[str, object],
source_anchor: dict[str, object],
origin: str = "deterministic",
review_state: str = "candidate",
status: str = "active",
confidence: float = 0.8,
graph_id: str | None = None,
aliases: Iterable[str] = (),
attributes: dict[str, object] | None = None,
lifecycle: str | None = None,
domain: str | None = None,
) -> dict[str, object]:
candidate: dict[str, object] = {
"stable_key": stable_key,
"kind": kind,
"label": label,
"repo": self.repo_slug,
"origin": origin,
"review_state": review_state,
"status": status,
"confidence": confidence,
"replacement_scope": replacement_scope,
"provenance": [provenance],
"source_anchors": [source_anchor],
}
if graph_id:
candidate["graph_id"] = graph_id
if self.domain or domain:
candidate["domain"] = domain or self.domain
if lifecycle:
candidate["lifecycle"] = lifecycle
clean_aliases = _unique_strings([*aliases, label, graph_id or ""])
if clean_aliases:
candidate["aliases"] = clean_aliases
if attributes:
candidate["attributes"] = _json_object(attributes)
merged = _merge_candidate(self.nodes.get(stable_key), candidate)
self.nodes[stable_key] = merged
return merged
def add_edge(
self,
*,
edge_type: str,
source_key: str,
target_key: str,
replacement_scope: str,
provenance: dict[str, object],
source_anchor: dict[str, object],
origin: str = "deterministic",
review_state: str = "candidate",
status: str = "active",
confidence: float = 0.8,
aliases: Iterable[str] = (),
attributes: dict[str, object] | None = None,
) -> dict[str, object]:
stable_key = relationship_stable_key(
source_key,
edge_type,
target_key,
evidence_scope=replacement_scope,
)
candidate: dict[str, object] = {
"stable_key": stable_key,
"edge_type": edge_type,
"source_key": source_key,
"target_key": target_key,
"origin": origin,
"review_state": review_state,
"status": status,
"confidence": confidence,
"replacement_scope": replacement_scope,
"provenance": [provenance],
"source_anchors": [source_anchor],
}
clean_aliases = _unique_strings(aliases)
if clean_aliases:
candidate["aliases"] = clean_aliases
if attributes:
candidate["attributes"] = _json_object(attributes)
merged = _merge_candidate(self.edges.get(stable_key), candidate)
self.edges[stable_key] = merged
return merged
def add_attribute(
self,
*,
entity_key: str,
name: str,
value: object,
replacement_scope: str,
provenance: dict[str, object],
source_anchor: dict[str, object],
origin: str = "deterministic",
review_state: str = "candidate",
confidence: float = 0.8,
) -> dict[str, object]:
stable_key = attribute_stable_key(entity_key, name)
candidate: dict[str, object] = {
"stable_key": stable_key,
"entity_key": entity_key,
"name": name,
"value": _json_value(value),
"origin": origin,
"review_state": review_state,
"confidence": confidence,
"replacement_scope": replacement_scope,
"provenance": [provenance],
"source_anchors": [source_anchor],
}
merged = _merge_candidate(self.attributes.get(stable_key), candidate)
self.attributes[stable_key] = merged
return merged
def candidates(self) -> dict[str, list[dict[str, object]]]:
return {
"nodes": _sorted_values(self.nodes),
"edges": _sorted_values(self.edges),
"attributes": _sorted_values(self.attributes),
}
def scopes(self) -> list[dict[str, object]]:
return _sorted_values(self.replacement_scopes)
def scan_repo(options: ScanOptions | Path, **overrides: object) -> dict[str, object]:
if isinstance(options, Path):
options = ScanOptions(repo_path=options, **overrides)
elif overrides:
options = ScanOptions(**{**options.__dict__, **overrides})
repo_path = options.repo_path.resolve()
repo_slug = normalize_identity_part(options.repo_slug or repo_path.name, fallback="repo")
repo_name = options.repo_name or repo_path.name
commit = options.commit or _git_value(repo_path, "rev-parse", "HEAD") or "working-tree"
now = _utc_now()
accumulator = CandidateAccumulator(repo_slug=repo_slug, domain=options.domain)
context = ScanContext(
repo_path=repo_path,
repo_slug=repo_slug,
repo_name=repo_name,
commit=commit,
domain=options.domain,
accumulator=accumulator,
)
for extractor in _deterministic_extractors():
extractor(context)
run_id = "scan:{repo}:{profile}:{fingerprint}".format(
repo=repo_slug,
profile=normalize_identity_part(options.profile),
fingerprint=short_fingerprint({"commit": commit, "path": str(repo_path)}),
)
snapshot = {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "FabricDiscoverySnapshot",
"generated_at": now,
"source": {
"repo_slug": repo_slug,
"repo_name": repo_name,
"domain": options.domain or "",
"commit": commit,
"default_branch": _git_value(repo_path, "rev-parse", "--abbrev-ref", "HEAD") or "",
"path": str(repo_path),
},
"scan": {
"run_id": run_id,
"profile": options.profile,
"deterministic_only": options.deterministic_only,
"llm_enabled": options.llm_enabled,
"started_at": now,
"completed_at": now,
},
"replacement_scopes": accumulator.scopes(),
"candidates": accumulator.candidates(),
"tombstones": [],
"reconciliation": {
"precedence": ["repo_declaration", "deterministic", "catalog", "registry", "llm", "manual"],
"duplicate_policy": "stable-key matches merge automatically; alias-only matches require review",
"retirement_policy": "missing candidates retire only inside their replacement scope",
},
}
if options.connectors:
snapshot = apply_connectors(
snapshot,
repo_path=repo_path,
configs=options.connectors,
)
if options.llm_enabled:
return augment_snapshot_with_llm(
snapshot,
config=options.llm_config,
adapter=options.llm_adapter,
)
return snapshot
@dataclass
class ScanContext:
repo_path: Path
repo_slug: str
repo_name: str
commit: str
domain: str | None
accumulator: CandidateAccumulator
@property
def repository_key(self) -> str:
return discovery_stable_key(self.repo_slug, "Repository", self.repo_slug)
def relpath(self, path: Path) -> str:
return path.resolve().relative_to(self.repo_path).as_posix()
def _deterministic_extractors() -> list:
return [
_extract_repo_metadata,
_extract_text_metadata,
_extract_fabric_declarations,
_extract_python_package,
_extract_node_package,
_extract_lockfiles,
_extract_dockerfile,
_extract_compose,
_extract_api_contracts,
_extract_score_files,
_extract_kubernetes_manifests,
_extract_service_configs,
]
def _extract_repo_metadata(context: ScanContext) -> None:
scope = context.accumulator.add_scope(
extractor_id="repo-metadata",
source_kind="file",
source_path=".",
description="Repository-level local metadata.",
)
anchor = _source_anchor("file", ".")
provenance = _provenance("repo-metadata")
remote_url = _git_value(context.repo_path, "config", "--get", "remote.origin.url") or ""
branch = _git_value(context.repo_path, "rev-parse", "--abbrev-ref", "HEAD") or ""
context.accumulator.add_node(
stable_key=context.repository_key,
kind="Repository",
label=context.repo_name,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[context.repo_slug],
attributes={
"repo_slug": context.repo_slug,
"path": str(context.repo_path),
"commit": context.commit,
"default_branch": branch,
"remote_url": remote_url,
},
confidence=0.95,
)
def _extract_text_metadata(context: ScanContext) -> None:
files = [
("README.md", "readme"),
("README.rst", "readme"),
("INTENT.md", "intent"),
("SCOPE.md", "scope"),
]
provenance = _provenance("repo-text-metadata")
for file_name, label in files:
path = context.repo_path / file_name
if not path.is_file():
continue
scope = context.accumulator.add_scope(
extractor_id="repo-text-metadata",
source_kind="file",
source_path=file_name,
description=f"Repository {label} document.",
)
anchor = _source_anchor("file", file_name, snippet=_snippet(path))
heading = _first_heading(path) or path.stem
context.accumulator.add_attribute(
entity_key=context.repository_key,
name=f"{label}_title",
value=heading,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.85,
)
context.accumulator.add_attribute(
entity_key=context.repository_key,
name=f"{label}_present",
value=True,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.95,
)
def _extract_fabric_declarations(context: ScanContext) -> None:
declarations: list[tuple[Path, dict[str, Any]]] = []
for path in declaration_files(context.repo_path):
try:
data = load_yaml(path)
except Exception:
continue
if isinstance(data, dict):
declarations.append((path, data))
keys_by_id: dict[str, str] = {}
declaration_records: list[tuple[Path, dict[str, Any], str, dict[str, object], dict[str, object]]] = []
for path, data in declarations:
kind = str(data.get("kind") or "")
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
spec = data.get("spec") if isinstance(data.get("spec"), dict) else {}
graph_id = str(metadata.get("id") or "")
if not kind or not graph_id:
continue
relpath = context.relpath(path)
scope = context.accumulator.add_scope(
extractor_id="fabric-declarations",
source_kind="declaration",
source_path=relpath,
description="Repo-owned Fabric declaration.",
)
anchor = _source_anchor("declaration", relpath, json_pointer="/metadata/id", snippet=_snippet(path))
provenance = _provenance("fabric-declarations", method="declaration", origin="repo_declaration")
label = str(metadata.get("name") or graph_id)
stable_key = discovery_stable_key(context.repo_slug, kind, graph_id)
keys_by_id[graph_id] = stable_key
context.accumulator.add_node(
stable_key=stable_key,
graph_id=graph_id,
kind=kind,
label=label,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
origin="repo_declaration",
review_state="accepted",
confidence=1.0,
aliases=[graph_id, label],
lifecycle=str(spec.get("lifecycle") or ""),
domain=str(metadata.get("domain") or context.domain or ""),
attributes={
"metadata": metadata,
"spec": spec,
"declaration_path": relpath,
},
)
declaration_records.append((path, data, stable_key, provenance, anchor))
for path, data, source_key, provenance, anchor in declaration_records:
kind = str(data.get("kind") or "")
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
spec = data.get("spec") if isinstance(data.get("spec"), dict) else {}
relpath = context.relpath(path)
scope = replacement_scope_id(context.repo_slug, "fabric-declarations", "declaration", source_path=relpath)
graph_id = str(metadata.get("id") or "")
if kind == "ServiceDeclaration":
for capability_id in _string_list(spec.get("provides_capabilities")):
_add_declaration_edge(context, scope, provenance, anchor, source_key, keys_by_id, capability_id, "provides")
for interface_id in _string_list(spec.get("exposes_interfaces")):
_add_declaration_edge(context, scope, provenance, anchor, source_key, keys_by_id, interface_id, "exposes")
elif kind == "CapabilityDeclaration":
for interface_id in _string_list(spec.get("interface_ids")):
_add_declaration_edge(context, scope, provenance, anchor, source_key, keys_by_id, interface_id, "available_via")
elif kind == "DependencyDeclaration":
consumer = str(spec.get("consumer_service_id") or "")
if consumer:
_add_declaration_edge(context, scope, provenance, anchor, keys_by_id.get(consumer, ""), keys_by_id, graph_id, "consumes")
elif kind == "BindingAssertion":
dependency = str(spec.get("dependency_id") or "")
provider = str(spec.get("provider_capability_id") or "")
interface = str(spec.get("provider_interface_id") or "")
if dependency and provider:
_add_declaration_edge(context, scope, provenance, anchor, keys_by_id.get(dependency, ""), keys_by_id, provider, "binds")
if dependency and interface:
_add_declaration_edge(context, scope, provenance, anchor, keys_by_id.get(dependency, ""), keys_by_id, interface, "uses_interface")
def _add_declaration_edge(
context: ScanContext,
scope: str,
provenance: dict[str, object],
anchor: dict[str, object],
source_key: str,
keys_by_id: dict[str, str],
target_id: str,
edge_type: str,
) -> None:
target_key = keys_by_id.get(target_id)
if not source_key or not target_key:
return
context.accumulator.add_edge(
edge_type=edge_type,
source_key=source_key,
target_key=target_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
origin="repo_declaration",
review_state="accepted",
confidence=1.0,
aliases=[target_id],
)
def _extract_python_package(context: ScanContext) -> None:
path = context.repo_path / "pyproject.toml"
if not path.is_file():
return
try:
data = tomllib.loads(path.read_text(encoding="utf-8"))
except Exception:
return
project = data.get("project")
if not isinstance(project, dict):
return
name = str(project.get("name") or "").strip()
if not name:
return
scope = context.accumulator.add_scope(
extractor_id="python-package",
source_kind="package_manifest",
source_path="pyproject.toml",
description="Python package metadata from pyproject.toml.",
)
anchor = _source_anchor("package_manifest", "pyproject.toml", json_pointer="/project/name", snippet=_snippet(path))
provenance = _provenance("python-package")
package_key = discovery_stable_key(context.repo_slug, "Library", name)
dependencies = _string_list(project.get("dependencies"))
context.accumulator.add_node(
stable_key=package_key,
kind="Library",
label=name,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[name, normalize_identity_part(name)],
attributes={
"language": "python",
"package_manager": "python",
"package_name": name,
"version": project.get("version") or "",
"description": project.get("description") or "",
"dependency_count": len(dependencies),
},
confidence=0.9,
)
context.accumulator.add_edge(
edge_type="declares_package",
source_key=context.repository_key,
target_key=package_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.9,
)
for index, spec in enumerate(dependencies):
dep_name = _python_dependency_name(spec)
if not dep_name:
continue
dep_anchor = _source_anchor("package_manifest", "pyproject.toml", json_pointer=f"/project/dependencies/{index}")
dep_key = discovery_stable_key(context.repo_slug, "ExternalLibrary", dep_name)
context.accumulator.add_node(
stable_key=dep_key,
kind="ExternalLibrary",
label=dep_name,
replacement_scope=scope,
provenance=provenance,
source_anchor=dep_anchor,
aliases=[dep_name],
attributes={"ecosystem": "python", "dependency_spec": spec},
confidence=0.85,
)
context.accumulator.add_edge(
edge_type="depends_on_library",
source_key=package_key,
target_key=dep_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=dep_anchor,
confidence=0.85,
)
def _extract_node_package(context: ScanContext) -> None:
path = context.repo_path / "package.json"
if not path.is_file():
return
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return
if not isinstance(data, dict):
return
name = str(data.get("name") or "").strip()
if not name:
return
scope = context.accumulator.add_scope(
extractor_id="node-package",
source_kind="package_manifest",
source_path="package.json",
description="Node package metadata from package.json.",
)
anchor = _source_anchor("package_manifest", "package.json", json_pointer="/name", snippet=_snippet(path))
provenance = _provenance("node-package")
package_key = discovery_stable_key(context.repo_slug, "Library", name)
dependencies = _node_dependencies(data)
context.accumulator.add_node(
stable_key=package_key,
kind="Library",
label=name,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[name, normalize_identity_part(name)],
attributes={
"language": "javascript",
"package_manager": "npm",
"package_name": name,
"version": data.get("version") or "",
"private": bool(data.get("private", False)),
"script_count": len(data.get("scripts") if isinstance(data.get("scripts"), dict) else {}),
"dependency_count": len(dependencies),
},
confidence=0.9,
)
context.accumulator.add_edge(
edge_type="declares_package",
source_key=context.repository_key,
target_key=package_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.9,
)
for pointer, dep_name, dep_spec in dependencies:
dep_anchor = _source_anchor("package_manifest", "package.json", json_pointer=pointer)
dep_key = discovery_stable_key(context.repo_slug, "ExternalLibrary", dep_name)
context.accumulator.add_node(
stable_key=dep_key,
kind="ExternalLibrary",
label=dep_name,
replacement_scope=scope,
provenance=provenance,
source_anchor=dep_anchor,
aliases=[dep_name],
attributes={"ecosystem": "npm", "dependency_spec": dep_spec},
confidence=0.85,
)
context.accumulator.add_edge(
edge_type="depends_on_library",
source_key=package_key,
target_key=dep_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=dep_anchor,
confidence=0.85,
)
def _extract_lockfiles(context: ScanContext) -> None:
provenance = _provenance("lockfiles")
for path in _walk_files(context.repo_path):
if path.name not in LOCKFILES:
continue
relpath = context.relpath(path)
scope = context.accumulator.add_scope(
extractor_id="lockfiles",
source_kind="lockfile",
source_path=relpath,
description="Dependency lockfile evidence.",
)
anchor = _source_anchor("lockfile", relpath, snippet=_snippet(path))
lock_key = discovery_stable_key(context.repo_slug, "Lockfile", relpath)
context.accumulator.add_node(
stable_key=lock_key,
kind="Lockfile",
label=path.name,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[relpath, path.name],
attributes={"path": relpath, "size_bytes": path.stat().st_size},
confidence=0.95,
)
context.accumulator.add_edge(
edge_type="uses_lockfile",
source_key=context.repository_key,
target_key=lock_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.9,
)
def _extract_dockerfile(context: ScanContext) -> None:
provenance = _provenance("dockerfile")
for path in _walk_files(context.repo_path):
if path.name != "Dockerfile" and not path.name.startswith("Dockerfile."):
continue
relpath = context.relpath(path)
scope = context.accumulator.add_scope(
extractor_id="dockerfile",
source_kind="deployment_manifest",
source_path=relpath,
description="Container build recipe.",
)
anchor = _source_anchor("deployment_manifest", relpath, snippet=_snippet(path))
build_key = discovery_stable_key(context.repo_slug, "ContainerBuild", relpath)
base_images = _docker_base_images(path)
context.accumulator.add_node(
stable_key=build_key,
kind="ContainerBuild",
label=path.name,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[relpath, path.name],
attributes={"path": relpath, "base_images": base_images},
confidence=0.9,
)
context.accumulator.add_edge(
edge_type="builds_container",
source_key=context.repository_key,
target_key=build_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.9,
)
def _extract_compose(context: ScanContext) -> None:
provenance = _provenance("docker-compose")
for path in _walk_files(context.repo_path):
if path.name not in COMPOSE_FILES:
continue
documents = _load_yaml_documents(path)
if not documents or not isinstance(documents[0], dict):
continue
services = documents[0].get("services")
if not isinstance(services, dict):
continue
relpath = context.relpath(path)
scope = context.accumulator.add_scope(
extractor_id="docker-compose",
source_kind="deployment_manifest",
source_path=relpath,
description="Docker Compose service definitions.",
)
provenance = _provenance("docker-compose")
for service_name, service in sorted(services.items()):
if not isinstance(service, dict):
continue
pointer = f"/services/{_json_pointer_escape(str(service_name))}"
anchor = _source_anchor("deployment_manifest", relpath, json_pointer=pointer)
deployment_key = discovery_stable_key(context.repo_slug, "DeploymentService", str(service_name), source_anchor=anchor)
context.accumulator.add_node(
stable_key=deployment_key,
kind="DeploymentService",
label=str(service_name),
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[str(service_name)],
attributes={
"orchestrator": "docker-compose",
"image": service.get("image") or "",
"build": service.get("build") or "",
"ports": service.get("ports") if isinstance(service.get("ports"), list) else [],
},
confidence=0.9,
)
context.accumulator.add_edge(
edge_type="defines_deployment",
source_key=context.repository_key,
target_key=deployment_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.9,
)
def _extract_api_contracts(context: ScanContext) -> None:
provenance = _provenance("api-contracts")
for path in _walk_files(context.repo_path):
if path.suffix.lower() not in {".yaml", ".yml", ".json"}:
continue
data = _load_structured_file(path)
if not isinstance(data, dict):
continue
contract_kind = "openapi" if data.get("openapi") else "asyncapi" if data.get("asyncapi") else ""
if not contract_kind:
continue
info = data.get("info") if isinstance(data.get("info"), dict) else {}
title = str(info.get("title") or path.stem)
relpath = context.relpath(path)
scope = context.accumulator.add_scope(
extractor_id="api-contracts",
source_kind="api_contract",
source_path=relpath,
description="OpenAPI or AsyncAPI contract.",
)
anchor = _source_anchor("api_contract", relpath, json_pointer="/info/title", snippet=_snippet(path))
interface_key = discovery_stable_key(context.repo_slug, "InterfaceDeclaration", title, source_anchor=anchor)
context.accumulator.add_node(
stable_key=interface_key,
kind="InterfaceDeclaration",
label=title,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[title, relpath],
attributes={
"interface_type": "http-api" if contract_kind == "openapi" else "async-api",
"contract_kind": contract_kind,
"contract_version": data.get(contract_kind) or "",
"version": info.get("version") or "",
"path": relpath,
},
confidence=0.85,
)
context.accumulator.add_edge(
edge_type="documents_interface",
source_key=context.repository_key,
target_key=interface_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.8,
)
def _extract_score_files(context: ScanContext) -> None:
provenance = _provenance("score-files")
for path in _walk_files(context.repo_path):
if path.name not in {"score.yaml", "score.yml"}:
continue
data = _load_structured_file(path)
if not isinstance(data, dict):
continue
metadata = data.get("metadata") if isinstance(data.get("metadata"), dict) else {}
name = str(metadata.get("name") or data.get("name") or path.stem)
relpath = context.relpath(path)
scope = context.accumulator.add_scope(
extractor_id="score-files",
source_kind="deployment_manifest",
source_path=relpath,
description="Score workload specification.",
)
anchor = _source_anchor("deployment_manifest", relpath, json_pointer="/metadata/name", snippet=_snippet(path))
score_key = discovery_stable_key(context.repo_slug, "ScoreWorkload", name, source_anchor=anchor)
context.accumulator.add_node(
stable_key=score_key,
kind="ScoreWorkload",
label=name,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[name, relpath],
attributes={"path": relpath, "container_count": _mapping_len(data.get("containers"))},
confidence=0.85,
)
context.accumulator.add_edge(
edge_type="defines_workload",
source_key=context.repository_key,
target_key=score_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.85,
)
def _extract_kubernetes_manifests(context: ScanContext) -> None:
provenance = _provenance("kubernetes-manifests")
for path in _walk_files(context.repo_path):
if path.suffix.lower() not in {".yaml", ".yml"}:
continue
if _is_fabric_path(context, path):
continue
relpath = context.relpath(path)
documents = _load_yaml_documents(path)
for index, document in enumerate(documents):
if not isinstance(document, dict):
continue
kind = str(document.get("kind") or "")
if kind not in KUBERNETES_KINDS:
continue
metadata = document.get("metadata") if isinstance(document.get("metadata"), dict) else {}
name = str(metadata.get("name") or path.stem)
pointer = f"/{index}/metadata/name" if len(documents) > 1 else "/metadata/name"
scope = context.accumulator.add_scope(
extractor_id="kubernetes-manifests",
source_kind="deployment_manifest",
source_path=relpath,
description="Kubernetes-style deployment manifest.",
)
anchor = _source_anchor("deployment_manifest", relpath, json_pointer=pointer, snippet=_snippet(path))
node_kind = f"Kubernetes{kind}"
manifest_key = discovery_stable_key(context.repo_slug, node_kind, name, source_anchor=anchor)
context.accumulator.add_node(
stable_key=manifest_key,
kind=node_kind,
label=name,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[name, relpath],
attributes={
"api_version": document.get("apiVersion") or "",
"manifest_kind": kind,
"namespace": metadata.get("namespace") or "",
"path": relpath,
},
confidence=0.85,
)
context.accumulator.add_edge(
edge_type="defines_runtime_object",
source_key=context.repository_key,
target_key=manifest_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.85,
)
def _extract_service_configs(context: ScanContext) -> None:
provenance = _provenance("service-configs")
for path in _walk_files(context.repo_path):
if path.name not in SERVICE_CONFIG_FILES and not path.name.endswith(".env.example"):
continue
if _is_fabric_path(context, path):
continue
relpath = context.relpath(path)
scope = context.accumulator.add_scope(
extractor_id="service-configs",
source_kind="service_config",
source_path=relpath,
description="Service configuration file.",
)
anchor = _source_anchor("service_config", relpath, snippet=_snippet(path))
config_key = discovery_stable_key(context.repo_slug, "ServiceConfig", relpath)
context.accumulator.add_node(
stable_key=config_key,
kind="ServiceConfig",
label=path.name,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
aliases=[relpath, path.name],
attributes={"path": relpath, "format": path.suffix.lstrip(".") or "env"},
confidence=0.75,
)
context.accumulator.add_edge(
edge_type="uses_config",
source_key=context.repository_key,
target_key=config_key,
replacement_scope=scope,
provenance=provenance,
source_anchor=anchor,
confidence=0.75,
)
def _source_anchor(
source_kind: str,
path: str,
*,
json_pointer: str | None = None,
snippet: str | None = None,
) -> dict[str, object]:
anchor: dict[str, object] = {"source_kind": source_kind, "path": path}
if json_pointer:
anchor["json_pointer"] = json_pointer
if snippet:
anchor["snippet"] = snippet
anchor["fingerprint"] = source_fingerprint(anchor)
return anchor
def _provenance(
extractor_id: str,
*,
method: str = "deterministic",
origin: str = "deterministic",
) -> dict[str, object]:
return {
"extractor_id": extractor_id,
"extractor_version": EXTRACTOR_VERSION,
"method": method,
"origin": origin,
}
def _merge_candidate(existing: dict[str, object] | None, incoming: dict[str, object]) -> dict[str, object]:
if existing is None:
return incoming
merged = {**existing}
for field in ("aliases", "provenance", "source_anchors"):
values = [*list(existing.get(field, [])), *list(incoming.get(field, []))]
if values:
merged[field] = _unique_json(values) if field != "aliases" else _unique_strings(values)
if isinstance(existing.get("attributes"), dict) or isinstance(incoming.get("attributes"), dict):
merged["attributes"] = {
**(existing.get("attributes") if isinstance(existing.get("attributes"), dict) else {}),
**(incoming.get("attributes") if isinstance(incoming.get("attributes"), dict) else {}),
}
if isinstance(existing.get("confidence"), (int, float)) and isinstance(incoming.get("confidence"), (int, float)):
merged["confidence"] = max(float(existing["confidence"]), float(incoming["confidence"]))
if incoming.get("review_state") == "accepted":
merged["review_state"] = "accepted"
if incoming.get("origin") == "repo_declaration":
merged["origin"] = "repo_declaration"
return merged
def _sorted_values(mapping: dict[str, dict[str, object]]) -> list[dict[str, object]]:
return [mapping[key] for key in sorted(mapping)]
def _unique_strings(values: Iterable[object]) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for value in values:
text = str(value or "").strip()
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result
def _unique_json(values: Iterable[object]) -> list[object]:
seen: set[str] = set()
result: list[object] = []
for value in values:
key = json.dumps(value, sort_keys=True, default=str)
if key in seen:
continue
seen.add(key)
result.append(value)
return result
def _json_object(value: dict[str, object]) -> dict[str, object]:
return {str(key): _json_value(item) for key, item in value.items()}
def _json_value(value: object) -> object:
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, list):
return [_json_value(item) for item in value]
if isinstance(value, tuple):
return [_json_value(item) for item in value]
if isinstance(value, dict):
return {str(key): _json_value(item) for key, item in value.items()}
return str(value)
def _walk_files(repo_path: Path) -> Iterable[Path]:
for path in sorted(repo_path.rglob("*")):
if not path.is_file():
continue
if any(part in SKIP_DIRS for part in path.relative_to(repo_path).parts):
continue
yield path
def _is_fabric_path(context: ScanContext, path: Path) -> bool:
return bool(path.resolve().relative_to(context.repo_path).parts[:1] == ("fabric",))
def _load_structured_file(path: Path) -> object:
try:
if path.suffix.lower() == ".json":
return json.loads(path.read_text(encoding="utf-8"))
documents = _load_yaml_documents(path)
except Exception:
return None
return documents[0] if len(documents) == 1 else documents
def _load_yaml_documents(path: Path) -> list[object]:
try:
return [document for document in yaml.safe_load_all(path.read_text(encoding="utf-8")) if document is not None]
except Exception:
return []
def _snippet(path: Path, *, max_chars: int = 500) -> str:
try:
text = path.read_text(encoding="utf-8", errors="replace")
except Exception:
return ""
return text[:max_chars]
def _first_heading(path: Path) -> str:
try:
for line in path.read_text(encoding="utf-8", errors="replace").splitlines():
stripped = line.strip()
if stripped.startswith("#"):
return stripped.lstrip("#").strip()
if stripped:
return stripped[:120]
except Exception:
return ""
return ""
def _string_list(value: object) -> list[str]:
if not isinstance(value, list):
return []
return [str(item).strip() for item in value if str(item).strip()]
def _python_dependency_name(spec: str) -> str:
match = re.match(r"\s*([A-Za-z0-9_.-]+)", spec)
return match.group(1) if match else ""
def _node_dependencies(data: dict[str, object]) -> list[tuple[str, str, str]]:
dependencies: list[tuple[str, str, str]] = []
for block_name in ("dependencies", "devDependencies", "peerDependencies", "optionalDependencies"):
block = data.get(block_name)
if not isinstance(block, dict):
continue
for dep_name, dep_spec in sorted(block.items()):
escaped = _json_pointer_escape(str(dep_name))
dependencies.append((f"/{block_name}/{escaped}", str(dep_name), str(dep_spec)))
return dependencies
def _docker_base_images(path: Path) -> list[str]:
images: list[str] = []
try:
lines = path.read_text(encoding="utf-8", errors="replace").splitlines()
except Exception:
return images
for line in lines:
match = re.match(r"\s*FROM\s+([^\s]+)", line, flags=re.IGNORECASE)
if match:
images.append(match.group(1))
return images
def _mapping_len(value: object) -> int:
return len(value) if isinstance(value, dict) else 0
def _json_pointer_escape(value: str) -> str:
return value.replace("~", "~0").replace("/", "~1")
def _git_value(repo_path: Path, *args: str) -> str | None:
try:
result = subprocess.run(
["git", *args],
cwd=repo_path,
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
return None
if result.returncode != 0:
return None
value = result.stdout.strip()
return value or None
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")