from __future__ import annotations import hashlib import json import sqlite3 import subprocess import urllib.error import urllib.request from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any from .deployment_overlay import normalize_deployment_overlay from .discovery import normalize_identity_part, short_fingerprint from .loader import load_yaml, repo_root from .schema_validation import draft202012_validator EXTRACTOR_VERSION = "0.1.0" DEFAULT_ROOT_MANIFEST_PATH = repo_root() / "fabric" / "discovery" / "railiance-accountability-roots.yaml" def load_accountability_root_manifest(path: Path | None = None, *, validate: bool = True) -> dict[str, Any]: manifest_path = path or DEFAULT_ROOT_MANIFEST_PATH manifest = load_yaml(manifest_path) if not isinstance(manifest, dict): raise ValueError(f"accountability root manifest must be a mapping: {manifest_path}") if validate: validator = draft202012_validator(repo_root() / "schemas" / "accountability-root-manifest.schema.yaml") errors = sorted(validator.iter_errors(manifest), key=lambda error: list(error.path)) if errors: location = ".".join(str(part) for part in errors[0].path) or "" raise ValueError(f"invalid accountability root manifest at {location}: {errors[0].message}") return manifest def collect_accountability_root_evidence( manifest_path: Path | None = None, *, include_remote: bool = False, max_items_per_root: int = 200, ) -> dict[str, Any]: manifest_path = manifest_path or DEFAULT_ROOT_MANIFEST_PATH manifest = load_accountability_root_manifest(manifest_path) generated_at = _utc_now() roots: list[dict[str, Any]] = [] review_artifacts: list[dict[str, Any]] = [] for root in manifest.get("discovery_roots", []): if not isinstance(root, dict): continue root_record = { "root_id": root.get("id", ""), "root_type": root.get("type", ""), "status": root.get("status", "planned"), "fabric_id": root.get("fabric_id", ""), "owner_actor_id": root.get("owner_actor_id", ""), "safe_discovery": _source(root).get("safe_discovery", "metadata_only"), "evidence": [], } if root.get("subfabric_id"): root_record["subfabric_id"] = root["subfabric_id"] try: root_record["evidence"] = _collect_root_evidence( root, include_remote=include_remote, max_items=max_items_per_root, ) except Exception as exc: # pragma: no cover - defensive boundary for operator runs review_artifacts.append( _review_artifact( root, "adapter_failed", "error", f"{type(exc).__name__}: {exc}", ) ) roots.append(root_record) result = { "apiVersion": "railiance.fabric/v1alpha2", "kind": "AccountabilityRootEvidenceRun", "generated_at": generated_at, "manifest": { "id": manifest.get("metadata", {}).get("id", ""), "path": _display_path(manifest_path), "fingerprint": _file_sha256(manifest_path) or short_fingerprint(manifest), }, "roots": roots, "review_artifacts": review_artifacts, } validator = draft202012_validator(repo_root() / "schemas" / "accountability-root-evidence.schema.yaml") errors = sorted(validator.iter_errors(result), key=lambda error: list(error.path)) if errors: location = ".".join(str(part) for part in errors[0].path) or "" raise ValueError(f"invalid accountability root evidence at {location}: {errors[0].message}") return result def build_identity_projection( evidence_run: dict[str, Any], manifest: dict[str, Any] | None = None, ) -> dict[str, Any]: if manifest is None: manifest_path = evidence_run.get("manifest", {}).get("path") manifest = load_accountability_root_manifest(_resolve_path(manifest_path), validate=True) candidates: dict[str, dict[str, Any]] = {} netkingdom = manifest.get("netkingdom") if isinstance(manifest.get("netkingdom"), dict) else {} if netkingdom: _add_identity_candidate( candidates, identity_type="Netkingdom", label=str(netkingdom.get("name") or netkingdom.get("id")), graph_id=str(netkingdom.get("id")), fabric_id=None, owner_actor_id=str(netkingdom.get("king_actor_id") or ""), evidence_ids=[], aliases=[str(netkingdom.get("id") or "")], attributes={"king_actor_id": netkingdom.get("king_actor_id", "")}, confidence=1.0, ) for actor in manifest.get("actors", []): if not isinstance(actor, dict): continue _add_identity_candidate( candidates, identity_type="Actor", label=str(actor.get("name") or actor.get("id")), graph_id=str(actor.get("id")), fabric_id=None, owner_actor_id=str(actor.get("id") or ""), evidence_ids=[], aliases=[str(actor.get("id") or ""), str(actor.get("role") or "")], attributes={"role": actor.get("role", "")}, confidence=1.0, ) for fabric in manifest.get("fabrics", []): if not isinstance(fabric, dict): continue owner_actor_id = str(fabric.get("tenant_actor_id") or fabric.get("lord_actor_id") or "") _add_identity_candidate( candidates, identity_type=str(fabric.get("kind") or "Fabric"), label=str(fabric.get("name") or fabric.get("id")), graph_id=str(fabric.get("id")), fabric_id=str(fabric.get("id") or ""), subfabric_id=str(fabric.get("id")) if fabric.get("kind") == "Subfabric" else None, owner_actor_id=owner_actor_id, evidence_ids=[], aliases=[str(fabric.get("id") or ""), str(fabric.get("parent_fabric_id") or "")], attributes={ "status": fabric.get("status", ""), "netkingdom_id": fabric.get("netkingdom_id", ""), "parent_fabric_id": fabric.get("parent_fabric_id", ""), "boundary": fabric.get("boundary", {}), }, confidence=1.0, ) for root in evidence_run.get("roots", []): if not isinstance(root, dict): continue for item in root.get("evidence", []): if not isinstance(item, dict): continue identity = _identity_from_evidence(root, item) if identity is None: continue _add_identity_candidate(candidates, **identity) candidate_list = _mark_ambiguous_identities(list(candidates.values())) candidate_graph = _candidate_graph(candidate_list, manifest) projection = { "apiVersion": "railiance.fabric/v1alpha2", "kind": "AccountabilityIdentityProjection", "generated_at": _utc_now(), "evidence_run": { "manifest_id": evidence_run.get("manifest", {}).get("id", ""), "manifest_fingerprint": evidence_run.get("manifest", {}).get("fingerprint", ""), "generated_at": evidence_run.get("generated_at", ""), }, "identity_candidates": sorted(candidate_list, key=lambda item: item["stable_key"]), "candidate_graph": candidate_graph, } validator = draft202012_validator(repo_root() / "schemas" / "accountability-identity-projection.schema.yaml") errors = sorted(validator.iter_errors(projection), key=lambda error: list(error.path)) if errors: location = ".".join(str(part) for part in errors[0].path) or "" raise ValueError(f"invalid accountability identity projection at {location}: {errors[0].message}") return projection def build_ownership_review( identity_projection: dict[str, Any], manifest: dict[str, Any], *, review_decisions: dict[str, dict[str, Any]] | None = None, ) -> dict[str, Any]: review_decisions = review_decisions or {} actor_roles = { actor.get("id"): actor.get("role", "") for actor in manifest.get("actors", []) if isinstance(actor, dict) } fabric_owners: dict[str, str] = {} fabric_kinds: dict[str, str] = {} for fabric in manifest.get("fabrics", []): if not isinstance(fabric, dict): continue fabric_id = str(fabric.get("id") or "") fabric_kinds[fabric_id] = str(fabric.get("kind") or "") fabric_owners[fabric_id] = str(fabric.get("tenant_actor_id") or fabric.get("lord_actor_id") or "") items = [ _ownership_item(candidate, actor_roles, fabric_owners, fabric_kinds, review_decisions.get(candidate["stable_key"])) for candidate in identity_projection.get("identity_candidates", []) if isinstance(candidate, dict) ] review = { "apiVersion": "railiance.fabric/v1alpha2", "kind": "AccountabilityOwnershipReview", "generated_at": _utc_now(), "evidence_run": identity_projection.get("evidence_run", {}), "items": sorted(items, key=lambda item: item["stable_key"]), "summary": { "total": len(items), "accepted": sum(1 for item in items if item["review_state"] == "accepted"), "needs_review": sum(1 for item in items if item["review_state"] == "needs_review"), "unresolved_ownership": sum( 1 for item in items if item["ownership"]["resolution"] == "unresolved" ), "ambiguous_containment": sum( 1 for item in items if item["containment"]["status"] == "ambiguous" ), }, } validator = draft202012_validator(repo_root() / "schemas" / "accountability-ownership-review.schema.yaml") errors = sorted(validator.iter_errors(review), key=lambda error: list(error.path)) if errors: location = ".".join(str(part) for part in errors[0].path) or "" raise ValueError(f"invalid accountability ownership review at {location}: {errors[0].message}") return review def build_update_delta( current_identity_projection: dict[str, Any], current_ownership_review: dict[str, Any], *, previous_identity_projection: dict[str, Any] | None = None, previous_ownership_review: dict[str, Any] | None = None, ) -> dict[str, Any]: previous_identity_projection = previous_identity_projection or {} previous_ownership_review = previous_ownership_review or {} current_nodes = { item["stable_key"]: item for item in current_identity_projection.get("identity_candidates", []) if isinstance(item, dict) and item.get("stable_key") } previous_nodes = { item["stable_key"]: item for item in previous_identity_projection.get("identity_candidates", []) if isinstance(item, dict) and item.get("stable_key") } current_edges = { item["id"]: item for item in current_identity_projection.get("candidate_graph", {}).get("edges", []) if isinstance(item, dict) and item.get("id") } previous_edges = { item["id"]: item for item in previous_identity_projection.get("candidate_graph", {}).get("edges", []) if isinstance(item, dict) and item.get("id") } current_review = { item["stable_key"]: item for item in current_ownership_review.get("items", []) if isinstance(item, dict) and item.get("stable_key") } previous_review = { item["stable_key"]: item for item in previous_ownership_review.get("items", []) if isinstance(item, dict) and item.get("stable_key") } node_delta = _delta_sets(previous_nodes, current_nodes) edge_delta = _delta_sets(previous_edges, current_edges) ownership_changes = _field_changes(previous_review, current_review, "ownership") containment_changes = _field_changes(previous_review, current_review, "containment") review_state_changes = [ key for key in sorted(set(previous_review) & set(current_review)) if previous_review[key].get("review_state") != current_review[key].get("review_state") ] blocker_changes = _field_changes(previous_review, current_review, "blockers") meaningful_changes = _unique_strings( [ *node_delta["added"], *node_delta["changed"], *node_delta["removed"], *edge_delta["added"], *edge_delta["changed"], *edge_delta["removed"], *ownership_changes, *containment_changes, *review_state_changes, *blocker_changes, ] ) delta = { "apiVersion": "railiance.fabric/v1alpha2", "kind": "AccountabilityUpdateDelta", "generated_at": _utc_now(), "current": current_identity_projection.get("evidence_run", {}), "previous": previous_identity_projection.get("evidence_run", {}), "node_delta": node_delta, "edge_delta": edge_delta, "change_sets": { "ownership": ownership_changes, "containment": containment_changes, "review_state": review_state_changes, "blockers": blocker_changes, }, "summary": { "nodes_added": len(node_delta["added"]), "nodes_changed": len(node_delta["changed"]), "nodes_removed": len(node_delta["removed"]), "nodes_unchanged": len(node_delta["unchanged"]), "edges_added": len(edge_delta["added"]), "edges_changed": len(edge_delta["changed"]), "edges_removed": len(edge_delta["removed"]), "edges_unchanged": len(edge_delta["unchanged"]), "meaningful_change_count": len(meaningful_changes), "promotion_needed": bool(meaningful_changes), }, } validator = draft202012_validator(repo_root() / "schemas" / "accountability-update-delta.schema.yaml") errors = sorted(validator.iter_errors(delta), key=lambda error: list(error.path)) if errors: location = ".".join(str(part) for part in errors[0].path) or "" raise ValueError(f"invalid accountability update delta at {location}: {errors[0].message}") return delta @dataclass(frozen=True) class AccountabilityEvidenceStore: path: Path def init_schema(self) -> None: if str(self.path) != ":memory:": self.path.parent.mkdir(parents=True, exist_ok=True) with self._connect() as db: db.executescript( """ create table if not exists accountability_evidence_runs ( id integer primary key autoincrement, manifest_id text not null, manifest_path text not null, manifest_fingerprint text not null, generated_at text not null, payload_json text not null, created_at text not null ); create table if not exists accountability_evidence_items ( id text not null, run_id integer not null references accountability_evidence_runs(id), root_id text not null, evidence_type text not null, state text not null, durable integer not null, live_telemetry integer not null, fingerprint text not null, summary text not null, source_json text not null, attributes_json text not null, payload_json text not null, primary key (id, run_id) ); create index if not exists idx_accountability_evidence_items_run on accountability_evidence_items(run_id); create table if not exists accountability_identity_candidates ( stable_key text not null, run_id integer not null references accountability_evidence_runs(id), identity_type text not null, label text not null, fabric_id text, subfabric_id text, owner_actor_id text, review_state text not null, confidence real not null, aliases_json text not null, evidence_ids_json text not null, attributes_json text not null, payload_json text not null, primary key (stable_key, run_id) ); create index if not exists idx_accountability_identity_candidates_run on accountability_identity_candidates(run_id); create table if not exists accountability_review_decisions ( id integer primary key autoincrement, stable_key text not null, decision text not null, reviewer text not null, owner_actor_id text, fabric_id text, subfabric_id text, note text, created_at text not null ); create index if not exists idx_accountability_review_decisions_stable_key on accountability_review_decisions(stable_key, id desc); """ ) def add_evidence_run( self, evidence_run: dict[str, Any], identity_projection: dict[str, Any] | None = None, ) -> dict[str, Any]: self.init_schema() created_at = _utc_now() manifest = evidence_run.get("manifest", {}) with self._connect() as db: cursor = db.execute( """ insert into accountability_evidence_runs ( manifest_id, manifest_path, manifest_fingerprint, generated_at, payload_json, created_at ) values (?, ?, ?, ?, ?, ?) """, ( manifest.get("id", ""), manifest.get("path", ""), manifest.get("fingerprint", ""), evidence_run.get("generated_at", ""), json.dumps(evidence_run, sort_keys=True), created_at, ), ) run_id = int(cursor.lastrowid) for item in _iter_evidence_items(evidence_run): db.execute( """ insert into accountability_evidence_items ( id, run_id, root_id, evidence_type, state, durable, live_telemetry, fingerprint, summary, source_json, attributes_json, payload_json ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( item.get("id", ""), run_id, item.get("root_id", ""), item.get("evidence_type", ""), item.get("state", ""), 1 if item.get("durable") else 0, 1 if item.get("live_telemetry") else 0, item.get("fingerprint", ""), item.get("summary", ""), json.dumps(item.get("source", {}), sort_keys=True), json.dumps(item.get("attributes", {}), sort_keys=True), json.dumps(item, sort_keys=True), ), ) if identity_projection is not None: for candidate in identity_projection.get("identity_candidates", []): db.execute( """ insert into accountability_identity_candidates ( stable_key, run_id, identity_type, label, fabric_id, subfabric_id, owner_actor_id, review_state, confidence, aliases_json, evidence_ids_json, attributes_json, payload_json ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( candidate.get("stable_key", ""), run_id, candidate.get("identity_type", ""), candidate.get("label", ""), candidate.get("fabric_id", ""), candidate.get("subfabric_id", ""), candidate.get("owner_actor_id", ""), candidate.get("review_state", ""), float(candidate.get("confidence") or 0), json.dumps(candidate.get("aliases", []), sort_keys=True), json.dumps(candidate.get("evidence_ids", []), sort_keys=True), json.dumps(candidate.get("attributes", {}), sort_keys=True), json.dumps(candidate, sort_keys=True), ), ) return { "run_id": run_id, "evidence_count": len(list(_iter_evidence_items(evidence_run))), "identity_candidate_count": len(identity_projection.get("identity_candidates", [])) if identity_projection else 0, } def latest_run(self) -> dict[str, Any] | None: with self._connect() as db: row = db.execute( """ select id, manifest_id, manifest_path, manifest_fingerprint, generated_at, created_at from accountability_evidence_runs order by id desc limit 1 """ ).fetchone() return dict(row) if row else None def add_review_decision( self, *, stable_key: str, decision: str, reviewer: str, owner_actor_id: str = "", fabric_id: str = "", subfabric_id: str = "", note: str = "", ) -> dict[str, Any]: self.init_schema() created_at = _utc_now() with self._connect() as db: cursor = db.execute( """ insert into accountability_review_decisions ( stable_key, decision, reviewer, owner_actor_id, fabric_id, subfabric_id, note, created_at ) values (?, ?, ?, ?, ?, ?, ?, ?) """, (stable_key, decision, reviewer, owner_actor_id, fabric_id, subfabric_id, note, created_at), ) decision_id = int(cursor.lastrowid) return { "id": decision_id, "stable_key": stable_key, "decision": decision, "reviewer": reviewer, "owner_actor_id": owner_actor_id, "fabric_id": fabric_id, "subfabric_id": subfabric_id, "note": note, "created_at": created_at, } def latest_review_decisions(self) -> dict[str, dict[str, Any]]: self.init_schema() with self._connect() as db: rows = db.execute( """ select * from accountability_review_decisions where id in ( select max(id) from accountability_review_decisions group by stable_key ) order by stable_key """ ).fetchall() return {row["stable_key"]: dict(row) for row in rows} def list_evidence(self, run_id: int) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( """ select payload_json from accountability_evidence_items where run_id = ? order by root_id, evidence_type, id """, (run_id,), ).fetchall() return [json.loads(row["payload_json"]) for row in rows] def list_identity_candidates(self, run_id: int) -> list[dict[str, Any]]: with self._connect() as db: rows = db.execute( """ select payload_json from accountability_identity_candidates where run_id = ? order by stable_key """, (run_id,), ).fetchall() return [json.loads(row["payload_json"]) for row in rows] def _connect(self) -> sqlite3.Connection: db = sqlite3.connect(self.path) db.row_factory = sqlite3.Row return db def _identity_from_evidence(root: dict[str, Any], item: dict[str, Any]) -> dict[str, Any] | None: evidence_type = str(item.get("evidence_type") or "") source = item.get("source") if isinstance(item.get("source"), dict) else {} attributes = item.get("attributes") if isinstance(item.get("attributes"), dict) else {} evidence_ids = [str(item.get("id", ""))] fabric_id = str(root.get("fabric_id") or "") subfabric_id = str(root.get("subfabric_id") or "") or None owner_actor_id = str(root.get("owner_actor_id") or "") if evidence_type in {"registered_repository", "repository_checkout"}: declared_slug = str(source.get("repo_slug") or attributes.get("repo_slug") or "") identity_slug = str( source.get("identity_slug") or attributes.get("canonical_slug") or attributes.get("canonical_repo_slug") or declared_slug or Path(str(source.get("path") or "")).name ) candidate = { "identity_type": "Repository", "label": identity_slug, "graph_id": identity_slug, "fabric_id": fabric_id, "subfabric_id": subfabric_id, "owner_actor_id": owner_actor_id, "evidence_ids": evidence_ids, "aliases": [identity_slug, declared_slug, str(source.get("path") or ""), str(source.get("remote_url") or "")], "attributes": { **attributes, "declared_repo_slug": declared_slug, "source_evidence_type": evidence_type, }, "confidence": 0.9 if evidence_type == "repository_checkout" else 0.85, } overlay = normalize_deployment_overlay(source, attributes) if overlay: candidate["deployment_overlay"] = overlay return candidate if evidence_type in {"deployment_automation", "infrastructure_manifest"}: path = str(source.get("path") or "") return { "identity_type": "Deployable", "label": Path(path).name or evidence_type, "graph_id": path, "fabric_id": fabric_id, "subfabric_id": subfabric_id, "owner_actor_id": owner_actor_id, "evidence_ids": evidence_ids, "aliases": [path], "attributes": {**attributes, "source_evidence_type": evidence_type}, "confidence": 0.75, } if evidence_type == "service_config": path = str(source.get("path") or "") return { "identity_type": "ServiceConfig", "label": Path(path).name or "service-config", "graph_id": path, "fabric_id": fabric_id, "subfabric_id": subfabric_id, "owner_actor_id": owner_actor_id, "evidence_ids": evidence_ids, "aliases": [path], "attributes": {**attributes, "source_evidence_type": evidence_type}, "confidence": 0.7, } if evidence_type == "endpoint_contract": path = str(source.get("path") or "") candidate = { "identity_type": "Endpoint", "label": Path(path).name or "endpoint-contract", "graph_id": path, "fabric_id": fabric_id, "subfabric_id": subfabric_id, "owner_actor_id": owner_actor_id, "evidence_ids": evidence_ids, "aliases": [path], "attributes": {**attributes, "source_evidence_type": evidence_type}, "confidence": 0.75, } overlay = normalize_deployment_overlay(source, attributes) if overlay: candidate["deployment_overlay"] = overlay return candidate if evidence_type == "host_path_match": path = str(source.get("path") or "") return { "identity_type": "HostPath", "label": path or "host-path", "graph_id": path, "fabric_id": fabric_id, "subfabric_id": subfabric_id, "owner_actor_id": owner_actor_id, "evidence_ids": evidence_ids, "aliases": [path], "attributes": {**attributes, "source_evidence_type": evidence_type}, "confidence": 0.65, } if evidence_type in {"secret_root", "backup_recovery"}: path = str(source.get("path") or "") return { "identity_type": "SecretRoot" if evidence_type == "secret_root" else "BackupRecoveryRoot", "label": Path(path).name or evidence_type, "graph_id": path or evidence_type, "fabric_id": fabric_id, "subfabric_id": subfabric_id, "owner_actor_id": owner_actor_id, "evidence_ids": evidence_ids, "aliases": [path], "attributes": {**attributes, "source_evidence_type": evidence_type}, "confidence": 0.65, } if evidence_type in {"state_hub_repo_inventory", "gitea_organization", "gitea_repository", "registry_manifest"}: return { "identity_type": "CatalogRoot", "label": str(source.get("url") or source.get("manifest_path") or root.get("id")), "graph_id": str(root.get("id") or evidence_type), "fabric_id": fabric_id, "subfabric_id": subfabric_id, "owner_actor_id": owner_actor_id, "evidence_ids": evidence_ids, "aliases": [str(source.get("url") or ""), str(source.get("manifest_path") or "")], "attributes": {**attributes, "source_evidence_type": evidence_type}, "confidence": 0.6, } return None def _ownership_item( candidate: dict[str, Any], actor_roles: dict[str, str], fabric_owners: dict[str, str], fabric_kinds: dict[str, str], decision: dict[str, Any] | None, ) -> dict[str, Any]: attributes = candidate.get("attributes") if isinstance(candidate.get("attributes"), dict) else {} blockers: list[str] = [] fabric_id = str(candidate.get("fabric_id") or "") subfabric_id = str(candidate.get("subfabric_id") or "") owner_actor_id = str(candidate.get("owner_actor_id") or "") resolution = "explicit" if owner_actor_id else "unresolved" inherited_from = "" if attributes.get("ambiguous_aliases"): blockers.append("ambiguous_identity") if attributes.get("ambiguous_owner_actor_ids"): blockers.append("ambiguous_ownership") if candidate.get("identity_type") not in {"Actor", "Netkingdom"}: if not fabric_id: blockers.append("containment_unresolved") elif fabric_id not in fabric_kinds: blockers.append("unknown_fabric") if subfabric_id and subfabric_id not in fabric_kinds: blockers.append("unknown_subfabric") if not owner_actor_id and subfabric_id and fabric_owners.get(subfabric_id): owner_actor_id = fabric_owners[subfabric_id] resolution = "inherited" inherited_from = subfabric_id if not owner_actor_id and fabric_id and fabric_owners.get(fabric_id): owner_actor_id = fabric_owners[fabric_id] resolution = "inherited" inherited_from = fabric_id if candidate.get("identity_type") == "Actor" and not owner_actor_id: owner_actor_id = str(candidate.get("graph_id") or "") resolution = "explicit" if owner_actor_id else "unresolved" decision_payload: dict[str, Any] | None = None if decision: decision_payload = { "decision": decision.get("decision", ""), "reviewer": decision.get("reviewer", ""), "note": decision.get("note", ""), "created_at": decision.get("created_at", ""), } if decision.get("fabric_id"): fabric_id = str(decision["fabric_id"]) if decision.get("subfabric_id"): subfabric_id = str(decision["subfabric_id"]) if decision.get("owner_actor_id"): owner_actor_id = str(decision["owner_actor_id"]) resolution = "review_decision" inherited_from = "" if decision.get("decision") == "accept": blockers = [ blocker for blocker in blockers if blocker not in { "ambiguous_identity", "ambiguous_ownership", "containment_unresolved", "unknown_fabric", "unknown_subfabric", "ownership_unresolved", "unknown_owner_actor", } ] if candidate.get("identity_type") not in {"Actor", "Netkingdom"}: if not fabric_id: blockers.append("containment_unresolved") elif fabric_id not in fabric_kinds: blockers.append("unknown_fabric") if subfabric_id and subfabric_id not in fabric_kinds: blockers.append("unknown_subfabric") if not owner_actor_id: blockers.append("ownership_unresolved") resolution = "unresolved" if owner_actor_id and owner_actor_id not in actor_roles: blockers.append("unknown_owner_actor") review_state = "candidate" if blockers: review_state = "needs_review" if decision: if decision.get("decision") == "reject": review_state = "rejected" elif decision.get("decision") == "needs_review": review_state = "needs_review" elif decision.get("decision") == "accept" and owner_actor_id and not blockers: review_state = "accepted" elif decision.get("decision") == "accept": review_state = "needs_review" blockers.append("accepted_without_resolved_owner_or_containment") containment_status = "resolved" if "containment_unresolved" in blockers: containment_status = "unresolved" elif "unknown_fabric" in blockers or "unknown_subfabric" in blockers: containment_status = "ambiguous" item = { "stable_key": candidate["stable_key"], "identity_type": candidate["identity_type"], "label": candidate["label"], "review_state": review_state, "ownership": { "owner_actor_id": owner_actor_id, "owner_role": actor_roles.get(owner_actor_id, ""), "resolution": resolution, }, "containment": { "fabric_id": fabric_id, "subfabric_id": subfabric_id, "status": containment_status, }, "blockers": _unique_strings(blockers), "evidence_ids": candidate.get("evidence_ids", []), } if inherited_from: item["ownership"]["inherited_from"] = inherited_from if decision_payload: item["decision"] = decision_payload return item def _delta_sets(previous: dict[str, dict[str, Any]], current: dict[str, dict[str, Any]]) -> dict[str, list[str]]: previous_keys = set(previous) current_keys = set(current) common = previous_keys & current_keys changed = [ key for key in sorted(common) if short_fingerprint(_stable_payload(previous[key]), length=16) != short_fingerprint(_stable_payload(current[key]), length=16) ] return { "added": sorted(current_keys - previous_keys), "changed": changed, "removed": sorted(previous_keys - current_keys), "unchanged": sorted(common - set(changed)), } def _field_changes( previous: dict[str, dict[str, Any]], current: dict[str, dict[str, Any]], field: str, ) -> list[str]: return [ key for key in sorted(set(previous) & set(current)) if previous[key].get(field) != current[key].get(field) ] def _stable_payload(value: dict[str, Any]) -> dict[str, Any]: return { key: data for key, data in value.items() if key not in {"generated_at", "decision"} } def _add_identity_candidate( candidates: dict[str, dict[str, Any]], *, identity_type: str, label: str, graph_id: str | None = None, fabric_id: str | None = None, subfabric_id: str | None = None, owner_actor_id: str | None = None, evidence_ids: list[str], aliases: list[str], attributes: dict[str, Any], deployment_overlay: dict[str, Any] | None = None, confidence: float, ) -> None: normalized_type = normalize_identity_part(identity_type) identity_key = graph_id or label stable_key = f"identity:{normalized_type}:{normalize_identity_part(identity_key)}" incoming = { "stable_key": stable_key, "identity_type": identity_type, "label": label or identity_key, "review_state": "candidate", "confidence": confidence, "aliases": _unique_strings([identity_key, *aliases]), "evidence_ids": _unique_strings(evidence_ids), "attributes": {key: value for key, value in attributes.items() if value not in ("", None, [], {})}, } if graph_id: incoming["graph_id"] = graph_id if fabric_id: incoming["fabric_id"] = fabric_id if subfabric_id: incoming["subfabric_id"] = subfabric_id if owner_actor_id: incoming["owner_actor_id"] = owner_actor_id overlay = normalize_deployment_overlay(deployment_overlay or {}, attributes) if overlay: incoming["deployment_overlay"] = overlay existing = candidates.get(stable_key) if existing is None: candidates[stable_key] = incoming return existing["confidence"] = max(float(existing.get("confidence", 0)), confidence) existing["aliases"] = _unique_strings([*existing.get("aliases", []), *incoming["aliases"]]) existing["evidence_ids"] = _unique_strings([*existing.get("evidence_ids", []), *incoming["evidence_ids"]]) existing["attributes"] = {**existing.get("attributes", {}), **incoming["attributes"]} if incoming.get("deployment_overlay"): existing["deployment_overlay"] = normalize_deployment_overlay( existing.get("deployment_overlay") if isinstance(existing.get("deployment_overlay"), dict) else {}, incoming["deployment_overlay"], ) if incoming.get("owner_actor_id") and existing.get("owner_actor_id") and incoming["owner_actor_id"] != existing["owner_actor_id"]: existing["attributes"]["ambiguous_owner_actor_ids"] = _unique_strings( [existing["owner_actor_id"], incoming["owner_actor_id"], *existing["attributes"].get("ambiguous_owner_actor_ids", [])] ) existing["review_state"] = "needs_review" for key in ("fabric_id", "subfabric_id", "owner_actor_id", "graph_id"): if incoming.get(key) and not existing.get(key): existing[key] = incoming[key] def _mark_ambiguous_identities(candidates: list[dict[str, Any]]) -> list[dict[str, Any]]: alias_index: dict[tuple[str, str], list[str]] = {} for candidate in candidates: for alias in candidate.get("aliases", []): key = (str(candidate.get("identity_type")), normalize_identity_part(alias)) alias_index.setdefault(key, []).append(candidate["stable_key"]) ambiguous: dict[str, list[str]] = {} for (_identity_type, alias), keys in alias_index.items(): unique_keys = sorted(set(keys)) if len(unique_keys) > 1: for stable_key in unique_keys: ambiguous.setdefault(stable_key, []).append(alias) for candidate in candidates: aliases = ambiguous.get(candidate["stable_key"]) if aliases: candidate["review_state"] = "needs_review" candidate.setdefault("attributes", {})["ambiguous_aliases"] = sorted(aliases) return candidates def _candidate_graph(candidates: list[dict[str, Any]], manifest: dict[str, Any]) -> dict[str, Any]: nodes = [ { "id": candidate["stable_key"], "kind": candidate["identity_type"], "label": candidate["label"], "review_state": candidate["review_state"], "fabric_id": candidate.get("fabric_id", ""), "subfabric_id": candidate.get("subfabric_id", ""), "owner_actor_id": candidate.get("owner_actor_id", ""), "deployment_overlay": candidate.get("deployment_overlay", {}), } for candidate in sorted(candidates, key=lambda item: item["stable_key"]) ] edges: list[dict[str, Any]] = [] for fabric in manifest.get("fabrics", []): if not isinstance(fabric, dict): continue fabric_key = f"identity:{normalize_identity_part(fabric.get('kind') or 'Fabric')}:{normalize_identity_part(fabric.get('id'))}" parent = fabric.get("parent_fabric_id") or manifest.get("netkingdom", {}).get("id") parent_type = "Fabric" if fabric.get("parent_fabric_id") else "Netkingdom" parent_key = f"identity:{normalize_identity_part(parent_type)}:{normalize_identity_part(parent)}" edges.append( { "id": f"candidate-edge:{short_fingerprint([parent_key, 'contains', fabric_key], length=16)}", "from": parent_key, "to": fabric_key, "type": "contains", "review_state": "candidate", } ) return {"nodes": nodes, "edges": edges} def _iter_evidence_items(evidence_run: dict[str, Any]) -> list[dict[str, Any]]: return [ item for root in evidence_run.get("roots", []) if isinstance(root, dict) for item in root.get("evidence", []) if isinstance(item, dict) ] def _collect_root_evidence(root: dict[str, Any], *, include_remote: bool, max_items: int) -> list[dict[str, Any]]: root_type = str(root.get("type") or "") if root.get("status") == "disabled": return [_declared_evidence(root, "root_disabled", "skipped", "Discovery root is disabled.")] if root_type == "registry_manifest": return _registry_manifest_evidence(root, max_items=max_items) if root_type == "repository_checkout": return _repository_checkout_evidence(root) if root_type == "host_path": return _glob_root_evidence(root, "host_path_match", max_items=max_items) if root_type in {"deployment_automation", "infrastructure_manifest", "service_config", "endpoint_contract"}: return _glob_root_evidence(root, root_type, max_items=max_items) if root_type == "state_hub_repo_inventory": return _state_hub_evidence(root, include_remote=include_remote) if root_type in {"gitea_organization", "gitea_repository"}: return [_declared_evidence(root, root_type, "declared", f"{root_type} root declared.")] if root_type in {"secret_root", "backup_recovery", "manual_review_queue"}: return _metadata_root_evidence(root) return [_declared_evidence(root, root_type or "unknown_root", "declared", "Discovery root declared.")] def _registry_manifest_evidence(root: dict[str, Any], *, max_items: int) -> list[dict[str, Any]]: source = _source(root) manifest_path = _resolve_path(source.get("manifest_path") or source.get("path")) if not manifest_path.exists(): return [_declared_evidence(root, "registry_manifest_missing", "unavailable", f"Manifest missing: {manifest_path}")] manifest = load_yaml(manifest_path) repositories = manifest.get("repositories") if isinstance(manifest, dict) else [] if not isinstance(repositories, list): return [_declared_evidence(root, "registry_manifest_invalid", "unavailable", "Manifest has no repositories list.")] evidence: list[dict[str, Any]] = [ _file_evidence(root, manifest_path, "registry_manifest", summary=f"Registry manifest with {len(repositories)} repositories.") ] canonical_slug_by_index = _registry_canonical_slugs_by_index(repositories) for index, repo in enumerate(repositories[:max_items]): if not isinstance(repo, dict): continue repo_slug = str(repo.get("slug") or "") identity_slug = canonical_slug_by_index.get(index) or _repository_identity_slug(repo) or repo_slug repo_source = { "manifest_path": _display_path(manifest_path), "json_pointer": f"/repositories/{index}", "repo_slug": repo_slug, "identity_slug": identity_slug, "path": repo.get("path", ""), "remote_url": repo.get("remote_url", ""), } attributes = { "name": repo.get("name", ""), "domain": repo.get("domain", ""), "default_branch": repo.get("default_branch", ""), "state_hub_repo_id": repo.get("state_hub_repo_id", ""), "has_local_path": bool(repo.get("path")), "has_remote_url": bool(repo.get("remote_url")), "canonical_slug": identity_slug if identity_slug != repo_slug else "", "identity_resolution": "duplicate_path_alias" if identity_slug != repo_slug else "", } evidence.append( _evidence_item( root, evidence_type="registered_repository", state="declared", source=repo_source, summary=f"Registered repository {repo.get('slug', '')}.", attributes={key: value for key, value in attributes.items() if value not in ("", None)}, ) ) if len(repositories) > max_items: evidence.append(_declared_evidence(root, "registry_manifest_truncated", "skipped", f"Skipped {len(repositories) - max_items} repositories beyond max_items_per_root.")) return evidence def _registry_canonical_slugs_by_index(repositories: list[object]) -> dict[int, str]: explicit: dict[int, str] = {} path_groups: dict[str, list[tuple[int, dict[str, Any]]]] = {} for index, repo in enumerate(repositories): if not isinstance(repo, dict): continue explicit_slug = _repository_identity_slug(repo) if explicit_slug: explicit[index] = explicit_slug if _boolish(repo.get("split_identity")): continue path = str(repo.get("path") or "").strip() slug = str(repo.get("slug") or "").strip() if path and slug: path_groups.setdefault(path, []).append((index, repo)) canonical_by_index = dict(explicit) for path, group in path_groups.items(): if len(group) < 2: continue explicit_group_slugs = [ explicit_slug for index, _repo in group if (explicit_slug := explicit.get(index)) ] canonical_slug = explicit_group_slugs[0] if explicit_group_slugs else _select_canonical_repo_slug(path, group) for index, _repo in group: canonical_by_index.setdefault(index, canonical_slug) return canonical_by_index def _repository_identity_slug(repo: dict[str, Any]) -> str: return str( repo.get("canonical_slug") or repo.get("canonical_repo_slug") or repo.get("identity_slug") or repo.get("identity_repo_slug") or "" ).strip() def _select_canonical_repo_slug(path: str, group: list[tuple[int, dict[str, Any]]]) -> str: path_name = normalize_identity_part(Path(path).name) def sort_key(item: tuple[int, dict[str, Any]]) -> tuple[int, int, str]: _index, repo = item slug = str(repo.get("slug") or "") return ( 0 if normalize_identity_part(slug) == path_name else 1, 0 if repo.get("remote_url") else 1, normalize_identity_part(slug), ) _index, canonical = sorted(group, key=sort_key)[0] return str(canonical.get("slug") or "").strip() def _boolish(value: object) -> bool: if isinstance(value, bool): return value return str(value or "").strip().lower() in {"1", "true", "yes", "on"} def _repository_checkout_evidence(root: dict[str, Any]) -> list[dict[str, Any]]: source = _source(root) checkout = _resolve_path(source.get("path")) if not checkout.exists(): return [_declared_evidence(root, "repository_checkout_missing", "unavailable", f"Checkout missing: {checkout}")] attributes = { "repo_slug": source.get("repo_slug", ""), "path_exists": True, "has_git_dir": (checkout / ".git").exists(), "has_fabric_dir": (checkout / "fabric").exists(), "remote_origin": _git_value(checkout, "config", "--get", "remote.origin.url") or source.get("remote_url", ""), "head": _git_value(checkout, "rev-parse", "HEAD") or "", "branch": _git_value(checkout, "rev-parse", "--abbrev-ref", "HEAD") or "", } return [ _evidence_item( root, evidence_type="repository_checkout", state="observed", source={"path": _display_path(checkout), "repo_slug": source.get("repo_slug", "")}, summary=f"Repository checkout observed at {_display_path(checkout)}.", attributes={key: value for key, value in attributes.items() if value not in ("", None)}, ) ] def _glob_root_evidence(root: dict[str, Any], evidence_type: str, *, max_items: int) -> list[dict[str, Any]]: source = _source(root) base = _resolve_path(source.get("path") or ".") patterns = source.get("patterns") if isinstance(source.get("patterns"), list) else ["*"] if not base.exists(): return [_declared_evidence(root, f"{evidence_type}_missing", "unavailable", f"Root path missing: {base}")] matches: list[Path] = [] for pattern in patterns: matches.extend( path for path in sorted(base.glob(str(pattern))) if not _is_noise_match(evidence_type, path) ) if len(matches) >= max_items: break evidence = [ _evidence_item( root, evidence_type=evidence_type, state="observed", source={"path": _display_path(path)}, summary=f"Observed {evidence_type} at {_display_path(path)}.", attributes=_file_attributes(path), ) for path in matches[:max_items] ] if not evidence: evidence.append(_declared_evidence(root, f"{evidence_type}_empty", "unavailable", f"No files matched under {base}.")) if len(matches) > max_items: evidence.append(_declared_evidence(root, f"{evidence_type}_truncated", "skipped", f"Skipped {len(matches) - max_items} matches beyond max_items_per_root.")) return evidence def _is_noise_match(evidence_type: str, path: Path) -> bool: if evidence_type not in {"deployment_automation", "infrastructure_manifest", "service_config", "endpoint_contract"}: return False parts = path.parts noisy_parts = { ".cache", ".mypy_cache", ".nvm", ".pytest_cache", ".tox", ".venv", "__pycache__", "node_modules", "site-packages", } if any(part in noisy_parts for part in parts): return True return any(parts[index : index + 3] == ("go", "pkg", "mod") for index in range(max(len(parts) - 2, 0))) def _state_hub_evidence(root: dict[str, Any], *, include_remote: bool) -> list[dict[str, Any]]: source = _source(root) if not include_remote: return [_declared_evidence(root, "state_hub_repo_inventory", "declared", "State Hub repo inventory root declared; remote fetch disabled.")] base_url = str(source.get("base_url") or "").rstrip("/") evidence: list[dict[str, Any]] = [] for api_path in source.get("api_paths") or ["/managed-repos/"]: url = f"{base_url}{api_path}" try: with urllib.request.urlopen(url, timeout=5) as response: payload = json.loads(response.read()) except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: evidence.append(_declared_evidence(root, "state_hub_fetch_failed", "unavailable", f"{url}: {exc}")) continue count = len(payload) if isinstance(payload, list) else len(payload.get("items", [])) if isinstance(payload, dict) else 0 evidence.append( _evidence_item( root, evidence_type="state_hub_repo_inventory", state="observed", source={"url": url}, summary=f"Fetched State Hub repository inventory from {url}.", attributes={"item_count": count, "payload_fingerprint": short_fingerprint(payload)}, ) ) return evidence def _metadata_root_evidence(root: dict[str, Any]) -> list[dict[str, Any]]: source = _source(root) path = source.get("path") if path: resolved = _resolve_path(path) if resolved.exists(): return [_file_evidence(root, resolved, str(root.get("type") or "metadata_root"))] return [_declared_evidence(root, str(root.get("type") or "metadata_root"), "planned" if root.get("status") == "planned" else "declared", "Metadata-only root declared.")] def _file_evidence(root: dict[str, Any], path: Path, evidence_type: str, *, summary: str | None = None) -> dict[str, Any]: return _evidence_item( root, evidence_type=evidence_type, state="observed", source={"path": _display_path(path)}, summary=summary or f"Observed {evidence_type} file at {_display_path(path)}.", attributes=_file_attributes(path), ) def _declared_evidence(root: dict[str, Any], evidence_type: str, state: str, summary: str) -> dict[str, Any]: source = _source(root) return _evidence_item( root, evidence_type=evidence_type, state=state, source={key: value for key, value in source.items() if key != "safe_discovery"}, summary=summary, attributes={"safe_discovery": source.get("safe_discovery", "metadata_only")}, ) def _evidence_item( root: dict[str, Any], *, evidence_type: str, state: str, source: dict[str, Any], summary: str, attributes: dict[str, Any] | None = None, ) -> dict[str, Any]: payload = { "root_id": root.get("id", ""), "evidence_type": evidence_type, "state": state, "source": source, "summary": summary, "attributes": attributes or {}, } fingerprint = short_fingerprint(payload, length=16) return { "id": f"evidence:{root.get('id', 'root')}:{fingerprint}", "root_id": root.get("id", ""), "evidence_type": evidence_type, "state": state, "durable": True, "live_telemetry": False, "source": source, "provenance": { "extractor_id": "accountability-root-adapter", "extractor_version": EXTRACTOR_VERSION, "method": "deterministic", "origin": "deterministic", }, "fingerprint": fingerprint, "summary": summary, "attributes": attributes or {}, } def _review_artifact(root: dict[str, Any], artifact_type: str, severity: str, message: str) -> dict[str, Any]: return { "root_id": root.get("id", ""), "artifact_type": artifact_type, "severity": severity, "message": message, "source": _source(root), } def _source(root: dict[str, Any]) -> dict[str, Any]: source = root.get("source") return source if isinstance(source, dict) else {} def _resolve_path(value: object) -> Path: path = Path(str(value or ".")) return path if path.is_absolute() else repo_root() / path def _display_path(path: Path) -> str: try: return path.resolve().relative_to(repo_root()).as_posix() except ValueError: return str(path.resolve()) def _file_attributes(path: Path) -> dict[str, Any]: attributes: dict[str, Any] = { "path_type": "directory" if path.is_dir() else "file", "exists": path.exists(), } if path.is_file(): attributes["size_bytes"] = path.stat().st_size attributes["sha256"] = _file_sha256(path) return attributes def _file_sha256(path: Path) -> str | None: if not path.is_file(): return None digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def _git_value(repo_path: Path, *args: str) -> str | None: try: result = subprocess.run( ["git", *args], cwd=repo_path, text=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=False, timeout=5, ) except (OSError, subprocess.SubprocessError): return None value = result.stdout.strip() return value or None def _unique_strings(values: list[object]) -> list[str]: result: list[str] = [] seen: set[str] = set() for value in values: text = str(value or "").strip() if not text or text in seen: continue result.append(text) seen.add(text) return result def _utc_now() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")