Files
railiance-fabric/railiance_fabric/accountability_roots.py

906 lines
37 KiB
Python

from __future__ import annotations
import hashlib
import json
import sqlite3
import subprocess
import urllib.error
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .discovery import normalize_identity_part, short_fingerprint
from .loader import load_yaml, repo_root
from .schema_validation import draft202012_validator
EXTRACTOR_VERSION = "0.1.0"
DEFAULT_ROOT_MANIFEST_PATH = repo_root() / "fabric" / "discovery" / "railiance-accountability-roots.yaml"
def load_accountability_root_manifest(path: Path | None = None, *, validate: bool = True) -> dict[str, Any]:
manifest_path = path or DEFAULT_ROOT_MANIFEST_PATH
manifest = load_yaml(manifest_path)
if not isinstance(manifest, dict):
raise ValueError(f"accountability root manifest must be a mapping: {manifest_path}")
if validate:
validator = draft202012_validator(repo_root() / "schemas" / "accountability-root-manifest.schema.yaml")
errors = sorted(validator.iter_errors(manifest), key=lambda error: list(error.path))
if errors:
location = ".".join(str(part) for part in errors[0].path) or "<root>"
raise ValueError(f"invalid accountability root manifest at {location}: {errors[0].message}")
return manifest
def collect_accountability_root_evidence(
manifest_path: Path | None = None,
*,
include_remote: bool = False,
max_items_per_root: int = 200,
) -> dict[str, Any]:
manifest_path = manifest_path or DEFAULT_ROOT_MANIFEST_PATH
manifest = load_accountability_root_manifest(manifest_path)
generated_at = _utc_now()
roots: list[dict[str, Any]] = []
review_artifacts: list[dict[str, Any]] = []
for root in manifest.get("discovery_roots", []):
if not isinstance(root, dict):
continue
root_record = {
"root_id": root.get("id", ""),
"root_type": root.get("type", ""),
"status": root.get("status", "planned"),
"fabric_id": root.get("fabric_id", ""),
"owner_actor_id": root.get("owner_actor_id", ""),
"safe_discovery": _source(root).get("safe_discovery", "metadata_only"),
"evidence": [],
}
if root.get("subfabric_id"):
root_record["subfabric_id"] = root["subfabric_id"]
try:
root_record["evidence"] = _collect_root_evidence(
root,
include_remote=include_remote,
max_items=max_items_per_root,
)
except Exception as exc: # pragma: no cover - defensive boundary for operator runs
review_artifacts.append(
_review_artifact(
root,
"adapter_failed",
"error",
f"{type(exc).__name__}: {exc}",
)
)
roots.append(root_record)
result = {
"apiVersion": "railiance.fabric/v1alpha2",
"kind": "AccountabilityRootEvidenceRun",
"generated_at": generated_at,
"manifest": {
"id": manifest.get("metadata", {}).get("id", ""),
"path": _display_path(manifest_path),
"fingerprint": _file_sha256(manifest_path) or short_fingerprint(manifest),
},
"roots": roots,
"review_artifacts": review_artifacts,
}
validator = draft202012_validator(repo_root() / "schemas" / "accountability-root-evidence.schema.yaml")
errors = sorted(validator.iter_errors(result), key=lambda error: list(error.path))
if errors:
location = ".".join(str(part) for part in errors[0].path) or "<root>"
raise ValueError(f"invalid accountability root evidence at {location}: {errors[0].message}")
return result
def build_identity_projection(
evidence_run: dict[str, Any],
manifest: dict[str, Any] | None = None,
) -> dict[str, Any]:
if manifest is None:
manifest_path = evidence_run.get("manifest", {}).get("path")
manifest = load_accountability_root_manifest(_resolve_path(manifest_path), validate=True)
candidates: dict[str, dict[str, Any]] = {}
netkingdom = manifest.get("netkingdom") if isinstance(manifest.get("netkingdom"), dict) else {}
if netkingdom:
_add_identity_candidate(
candidates,
identity_type="Netkingdom",
label=str(netkingdom.get("name") or netkingdom.get("id")),
graph_id=str(netkingdom.get("id")),
fabric_id=None,
owner_actor_id=str(netkingdom.get("king_actor_id") or ""),
evidence_ids=[],
aliases=[str(netkingdom.get("id") or "")],
attributes={"king_actor_id": netkingdom.get("king_actor_id", "")},
confidence=1.0,
)
for actor in manifest.get("actors", []):
if not isinstance(actor, dict):
continue
_add_identity_candidate(
candidates,
identity_type="Actor",
label=str(actor.get("name") or actor.get("id")),
graph_id=str(actor.get("id")),
fabric_id=None,
owner_actor_id=str(actor.get("id") or ""),
evidence_ids=[],
aliases=[str(actor.get("id") or ""), str(actor.get("role") or "")],
attributes={"role": actor.get("role", "")},
confidence=1.0,
)
for fabric in manifest.get("fabrics", []):
if not isinstance(fabric, dict):
continue
owner_actor_id = str(fabric.get("tenant_actor_id") or fabric.get("lord_actor_id") or "")
_add_identity_candidate(
candidates,
identity_type=str(fabric.get("kind") or "Fabric"),
label=str(fabric.get("name") or fabric.get("id")),
graph_id=str(fabric.get("id")),
fabric_id=str(fabric.get("id") or ""),
subfabric_id=str(fabric.get("id")) if fabric.get("kind") == "Subfabric" else None,
owner_actor_id=owner_actor_id,
evidence_ids=[],
aliases=[str(fabric.get("id") or ""), str(fabric.get("parent_fabric_id") or "")],
attributes={
"status": fabric.get("status", ""),
"netkingdom_id": fabric.get("netkingdom_id", ""),
"parent_fabric_id": fabric.get("parent_fabric_id", ""),
"boundary": fabric.get("boundary", {}),
},
confidence=1.0,
)
for root in evidence_run.get("roots", []):
if not isinstance(root, dict):
continue
for item in root.get("evidence", []):
if not isinstance(item, dict):
continue
identity = _identity_from_evidence(root, item)
if identity is None:
continue
_add_identity_candidate(candidates, **identity)
candidate_list = _mark_ambiguous_identities(list(candidates.values()))
candidate_graph = _candidate_graph(candidate_list, manifest)
projection = {
"apiVersion": "railiance.fabric/v1alpha2",
"kind": "AccountabilityIdentityProjection",
"generated_at": _utc_now(),
"evidence_run": {
"manifest_id": evidence_run.get("manifest", {}).get("id", ""),
"manifest_fingerprint": evidence_run.get("manifest", {}).get("fingerprint", ""),
"generated_at": evidence_run.get("generated_at", ""),
},
"identity_candidates": sorted(candidate_list, key=lambda item: item["stable_key"]),
"candidate_graph": candidate_graph,
}
validator = draft202012_validator(repo_root() / "schemas" / "accountability-identity-projection.schema.yaml")
errors = sorted(validator.iter_errors(projection), key=lambda error: list(error.path))
if errors:
location = ".".join(str(part) for part in errors[0].path) or "<root>"
raise ValueError(f"invalid accountability identity projection at {location}: {errors[0].message}")
return projection
@dataclass(frozen=True)
class AccountabilityEvidenceStore:
path: Path
def init_schema(self) -> None:
if str(self.path) != ":memory:":
self.path.parent.mkdir(parents=True, exist_ok=True)
with self._connect() as db:
db.executescript(
"""
create table if not exists accountability_evidence_runs (
id integer primary key autoincrement,
manifest_id text not null,
manifest_path text not null,
manifest_fingerprint text not null,
generated_at text not null,
payload_json text not null,
created_at text not null
);
create table if not exists accountability_evidence_items (
id text not null,
run_id integer not null references accountability_evidence_runs(id),
root_id text not null,
evidence_type text not null,
state text not null,
durable integer not null,
live_telemetry integer not null,
fingerprint text not null,
summary text not null,
source_json text not null,
attributes_json text not null,
payload_json text not null,
primary key (id, run_id)
);
create index if not exists idx_accountability_evidence_items_run
on accountability_evidence_items(run_id);
create table if not exists accountability_identity_candidates (
stable_key text not null,
run_id integer not null references accountability_evidence_runs(id),
identity_type text not null,
label text not null,
fabric_id text,
subfabric_id text,
owner_actor_id text,
review_state text not null,
confidence real not null,
aliases_json text not null,
evidence_ids_json text not null,
attributes_json text not null,
payload_json text not null,
primary key (stable_key, run_id)
);
create index if not exists idx_accountability_identity_candidates_run
on accountability_identity_candidates(run_id);
"""
)
def add_evidence_run(
self,
evidence_run: dict[str, Any],
identity_projection: dict[str, Any] | None = None,
) -> dict[str, Any]:
self.init_schema()
created_at = _utc_now()
manifest = evidence_run.get("manifest", {})
with self._connect() as db:
cursor = db.execute(
"""
insert into accountability_evidence_runs (
manifest_id, manifest_path, manifest_fingerprint, generated_at,
payload_json, created_at
) values (?, ?, ?, ?, ?, ?)
""",
(
manifest.get("id", ""),
manifest.get("path", ""),
manifest.get("fingerprint", ""),
evidence_run.get("generated_at", ""),
json.dumps(evidence_run, sort_keys=True),
created_at,
),
)
run_id = int(cursor.lastrowid)
for item in _iter_evidence_items(evidence_run):
db.execute(
"""
insert into accountability_evidence_items (
id, run_id, root_id, evidence_type, state, durable, live_telemetry,
fingerprint, summary, source_json, attributes_json, payload_json
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
item.get("id", ""),
run_id,
item.get("root_id", ""),
item.get("evidence_type", ""),
item.get("state", ""),
1 if item.get("durable") else 0,
1 if item.get("live_telemetry") else 0,
item.get("fingerprint", ""),
item.get("summary", ""),
json.dumps(item.get("source", {}), sort_keys=True),
json.dumps(item.get("attributes", {}), sort_keys=True),
json.dumps(item, sort_keys=True),
),
)
if identity_projection is not None:
for candidate in identity_projection.get("identity_candidates", []):
db.execute(
"""
insert into accountability_identity_candidates (
stable_key, run_id, identity_type, label, fabric_id, subfabric_id,
owner_actor_id, review_state, confidence, aliases_json,
evidence_ids_json, attributes_json, payload_json
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
candidate.get("stable_key", ""),
run_id,
candidate.get("identity_type", ""),
candidate.get("label", ""),
candidate.get("fabric_id", ""),
candidate.get("subfabric_id", ""),
candidate.get("owner_actor_id", ""),
candidate.get("review_state", ""),
float(candidate.get("confidence") or 0),
json.dumps(candidate.get("aliases", []), sort_keys=True),
json.dumps(candidate.get("evidence_ids", []), sort_keys=True),
json.dumps(candidate.get("attributes", {}), sort_keys=True),
json.dumps(candidate, sort_keys=True),
),
)
return {
"run_id": run_id,
"evidence_count": len(list(_iter_evidence_items(evidence_run))),
"identity_candidate_count": len(identity_projection.get("identity_candidates", []))
if identity_projection
else 0,
}
def latest_run(self) -> dict[str, Any] | None:
with self._connect() as db:
row = db.execute(
"""
select id, manifest_id, manifest_path, manifest_fingerprint, generated_at, created_at
from accountability_evidence_runs
order by id desc
limit 1
"""
).fetchone()
return dict(row) if row else None
def list_evidence(self, run_id: int) -> list[dict[str, Any]]:
with self._connect() as db:
rows = db.execute(
"""
select payload_json
from accountability_evidence_items
where run_id = ?
order by root_id, evidence_type, id
""",
(run_id,),
).fetchall()
return [json.loads(row["payload_json"]) for row in rows]
def list_identity_candidates(self, run_id: int) -> list[dict[str, Any]]:
with self._connect() as db:
rows = db.execute(
"""
select payload_json
from accountability_identity_candidates
where run_id = ?
order by stable_key
""",
(run_id,),
).fetchall()
return [json.loads(row["payload_json"]) for row in rows]
def _connect(self) -> sqlite3.Connection:
db = sqlite3.connect(self.path)
db.row_factory = sqlite3.Row
return db
def _identity_from_evidence(root: dict[str, Any], item: dict[str, Any]) -> dict[str, Any] | None:
evidence_type = str(item.get("evidence_type") or "")
source = item.get("source") if isinstance(item.get("source"), dict) else {}
attributes = item.get("attributes") if isinstance(item.get("attributes"), dict) else {}
evidence_ids = [str(item.get("id", ""))]
fabric_id = str(root.get("fabric_id") or "")
subfabric_id = str(root.get("subfabric_id") or "") or None
owner_actor_id = str(root.get("owner_actor_id") or "")
if evidence_type in {"registered_repository", "repository_checkout"}:
label = str(source.get("repo_slug") or attributes.get("repo_slug") or Path(str(source.get("path") or "")).name)
return {
"identity_type": "Repository",
"label": label,
"graph_id": label,
"fabric_id": fabric_id,
"subfabric_id": subfabric_id,
"owner_actor_id": owner_actor_id,
"evidence_ids": evidence_ids,
"aliases": [label, str(source.get("path") or ""), str(source.get("remote_url") or "")],
"attributes": {**attributes, "source_evidence_type": evidence_type},
"confidence": 0.9 if evidence_type == "repository_checkout" else 0.85,
}
if evidence_type in {"deployment_automation", "infrastructure_manifest"}:
path = str(source.get("path") or "")
return {
"identity_type": "Deployable",
"label": Path(path).name or evidence_type,
"graph_id": path,
"fabric_id": fabric_id,
"subfabric_id": subfabric_id,
"owner_actor_id": owner_actor_id,
"evidence_ids": evidence_ids,
"aliases": [path, Path(path).stem],
"attributes": {**attributes, "source_evidence_type": evidence_type},
"confidence": 0.75,
}
if evidence_type == "service_config":
path = str(source.get("path") or "")
return {
"identity_type": "ServiceConfig",
"label": Path(path).name or "service-config",
"graph_id": path,
"fabric_id": fabric_id,
"subfabric_id": subfabric_id,
"owner_actor_id": owner_actor_id,
"evidence_ids": evidence_ids,
"aliases": [path],
"attributes": {**attributes, "source_evidence_type": evidence_type},
"confidence": 0.7,
}
if evidence_type == "endpoint_contract":
path = str(source.get("path") or "")
return {
"identity_type": "Endpoint",
"label": Path(path).name or "endpoint-contract",
"graph_id": path,
"fabric_id": fabric_id,
"subfabric_id": subfabric_id,
"owner_actor_id": owner_actor_id,
"evidence_ids": evidence_ids,
"aliases": [path],
"attributes": {**attributes, "source_evidence_type": evidence_type},
"confidence": 0.75,
}
if evidence_type == "host_path_match":
path = str(source.get("path") or "")
return {
"identity_type": "HostPath",
"label": path or "host-path",
"graph_id": path,
"fabric_id": fabric_id,
"subfabric_id": subfabric_id,
"owner_actor_id": owner_actor_id,
"evidence_ids": evidence_ids,
"aliases": [path],
"attributes": {**attributes, "source_evidence_type": evidence_type},
"confidence": 0.65,
}
if evidence_type in {"secret_root", "backup_recovery"}:
path = str(source.get("path") or "")
return {
"identity_type": "SecretRoot" if evidence_type == "secret_root" else "BackupRecoveryRoot",
"label": Path(path).name or evidence_type,
"graph_id": path or evidence_type,
"fabric_id": fabric_id,
"subfabric_id": subfabric_id,
"owner_actor_id": owner_actor_id,
"evidence_ids": evidence_ids,
"aliases": [path],
"attributes": {**attributes, "source_evidence_type": evidence_type},
"confidence": 0.65,
}
if evidence_type in {"state_hub_repo_inventory", "gitea_organization", "gitea_repository", "registry_manifest"}:
return {
"identity_type": "CatalogRoot",
"label": str(source.get("url") or source.get("manifest_path") or root.get("id")),
"graph_id": str(root.get("id") or evidence_type),
"fabric_id": fabric_id,
"subfabric_id": subfabric_id,
"owner_actor_id": owner_actor_id,
"evidence_ids": evidence_ids,
"aliases": [str(source.get("url") or ""), str(source.get("manifest_path") or "")],
"attributes": {**attributes, "source_evidence_type": evidence_type},
"confidence": 0.6,
}
return None
def _add_identity_candidate(
candidates: dict[str, dict[str, Any]],
*,
identity_type: str,
label: str,
graph_id: str | None = None,
fabric_id: str | None = None,
subfabric_id: str | None = None,
owner_actor_id: str | None = None,
evidence_ids: list[str],
aliases: list[str],
attributes: dict[str, Any],
confidence: float,
) -> None:
normalized_type = normalize_identity_part(identity_type)
identity_key = graph_id or label
stable_key = f"identity:{normalized_type}:{normalize_identity_part(identity_key)}"
incoming = {
"stable_key": stable_key,
"identity_type": identity_type,
"label": label or identity_key,
"review_state": "candidate",
"confidence": confidence,
"aliases": _unique_strings([identity_key, *aliases]),
"evidence_ids": _unique_strings(evidence_ids),
"attributes": {key: value for key, value in attributes.items() if value not in ("", None, [], {})},
}
if graph_id:
incoming["graph_id"] = graph_id
if fabric_id:
incoming["fabric_id"] = fabric_id
if subfabric_id:
incoming["subfabric_id"] = subfabric_id
if owner_actor_id:
incoming["owner_actor_id"] = owner_actor_id
existing = candidates.get(stable_key)
if existing is None:
candidates[stable_key] = incoming
return
existing["confidence"] = max(float(existing.get("confidence", 0)), confidence)
existing["aliases"] = _unique_strings([*existing.get("aliases", []), *incoming["aliases"]])
existing["evidence_ids"] = _unique_strings([*existing.get("evidence_ids", []), *incoming["evidence_ids"]])
existing["attributes"] = {**existing.get("attributes", {}), **incoming["attributes"]}
for key in ("fabric_id", "subfabric_id", "owner_actor_id", "graph_id"):
if incoming.get(key) and not existing.get(key):
existing[key] = incoming[key]
def _mark_ambiguous_identities(candidates: list[dict[str, Any]]) -> list[dict[str, Any]]:
alias_index: dict[tuple[str, str], list[str]] = {}
for candidate in candidates:
for alias in candidate.get("aliases", []):
key = (str(candidate.get("identity_type")), normalize_identity_part(alias))
alias_index.setdefault(key, []).append(candidate["stable_key"])
ambiguous: dict[str, list[str]] = {}
for (_identity_type, alias), keys in alias_index.items():
unique_keys = sorted(set(keys))
if len(unique_keys) > 1:
for stable_key in unique_keys:
ambiguous.setdefault(stable_key, []).append(alias)
for candidate in candidates:
aliases = ambiguous.get(candidate["stable_key"])
if aliases:
candidate["review_state"] = "needs_review"
candidate.setdefault("attributes", {})["ambiguous_aliases"] = sorted(aliases)
return candidates
def _candidate_graph(candidates: list[dict[str, Any]], manifest: dict[str, Any]) -> dict[str, Any]:
nodes = [
{
"id": candidate["stable_key"],
"kind": candidate["identity_type"],
"label": candidate["label"],
"review_state": candidate["review_state"],
"fabric_id": candidate.get("fabric_id", ""),
"subfabric_id": candidate.get("subfabric_id", ""),
"owner_actor_id": candidate.get("owner_actor_id", ""),
}
for candidate in sorted(candidates, key=lambda item: item["stable_key"])
]
edges: list[dict[str, Any]] = []
for fabric in manifest.get("fabrics", []):
if not isinstance(fabric, dict):
continue
fabric_key = f"identity:{normalize_identity_part(fabric.get('kind') or 'Fabric')}:{normalize_identity_part(fabric.get('id'))}"
parent = fabric.get("parent_fabric_id") or manifest.get("netkingdom", {}).get("id")
parent_type = "Fabric" if fabric.get("parent_fabric_id") else "Netkingdom"
parent_key = f"identity:{normalize_identity_part(parent_type)}:{normalize_identity_part(parent)}"
edges.append(
{
"id": f"candidate-edge:{short_fingerprint([parent_key, 'contains', fabric_key], length=16)}",
"from": parent_key,
"to": fabric_key,
"type": "contains",
"review_state": "candidate",
}
)
return {"nodes": nodes, "edges": edges}
def _iter_evidence_items(evidence_run: dict[str, Any]) -> list[dict[str, Any]]:
return [
item
for root in evidence_run.get("roots", [])
if isinstance(root, dict)
for item in root.get("evidence", [])
if isinstance(item, dict)
]
def _collect_root_evidence(root: dict[str, Any], *, include_remote: bool, max_items: int) -> list[dict[str, Any]]:
root_type = str(root.get("type") or "")
if root.get("status") == "disabled":
return [_declared_evidence(root, "root_disabled", "skipped", "Discovery root is disabled.")]
if root_type == "registry_manifest":
return _registry_manifest_evidence(root, max_items=max_items)
if root_type == "repository_checkout":
return _repository_checkout_evidence(root)
if root_type == "host_path":
return _glob_root_evidence(root, "host_path_match", max_items=max_items)
if root_type in {"deployment_automation", "infrastructure_manifest", "service_config", "endpoint_contract"}:
return _glob_root_evidence(root, root_type, max_items=max_items)
if root_type == "state_hub_repo_inventory":
return _state_hub_evidence(root, include_remote=include_remote)
if root_type in {"gitea_organization", "gitea_repository"}:
return [_declared_evidence(root, root_type, "declared", f"{root_type} root declared.")]
if root_type in {"secret_root", "backup_recovery", "manual_review_queue"}:
return _metadata_root_evidence(root)
return [_declared_evidence(root, root_type or "unknown_root", "declared", "Discovery root declared.")]
def _registry_manifest_evidence(root: dict[str, Any], *, max_items: int) -> list[dict[str, Any]]:
source = _source(root)
manifest_path = _resolve_path(source.get("manifest_path") or source.get("path"))
if not manifest_path.exists():
return [_declared_evidence(root, "registry_manifest_missing", "unavailable", f"Manifest missing: {manifest_path}")]
manifest = load_yaml(manifest_path)
repositories = manifest.get("repositories") if isinstance(manifest, dict) else []
if not isinstance(repositories, list):
return [_declared_evidence(root, "registry_manifest_invalid", "unavailable", "Manifest has no repositories list.")]
evidence: list[dict[str, Any]] = [
_file_evidence(root, manifest_path, "registry_manifest", summary=f"Registry manifest with {len(repositories)} repositories.")
]
for index, repo in enumerate(repositories[:max_items]):
if not isinstance(repo, dict):
continue
repo_source = {
"manifest_path": _display_path(manifest_path),
"json_pointer": f"/repositories/{index}",
"repo_slug": repo.get("slug", ""),
"path": repo.get("path", ""),
"remote_url": repo.get("remote_url", ""),
}
attributes = {
"name": repo.get("name", ""),
"domain": repo.get("domain", ""),
"default_branch": repo.get("default_branch", ""),
"state_hub_repo_id": repo.get("state_hub_repo_id", ""),
"has_local_path": bool(repo.get("path")),
"has_remote_url": bool(repo.get("remote_url")),
}
evidence.append(
_evidence_item(
root,
evidence_type="registered_repository",
state="declared",
source=repo_source,
summary=f"Registered repository {repo.get('slug', '<unknown>')}.",
attributes={key: value for key, value in attributes.items() if value not in ("", None)},
)
)
if len(repositories) > max_items:
evidence.append(_declared_evidence(root, "registry_manifest_truncated", "skipped", f"Skipped {len(repositories) - max_items} repositories beyond max_items_per_root."))
return evidence
def _repository_checkout_evidence(root: dict[str, Any]) -> list[dict[str, Any]]:
source = _source(root)
checkout = _resolve_path(source.get("path"))
if not checkout.exists():
return [_declared_evidence(root, "repository_checkout_missing", "unavailable", f"Checkout missing: {checkout}")]
attributes = {
"repo_slug": source.get("repo_slug", ""),
"path_exists": True,
"has_git_dir": (checkout / ".git").exists(),
"has_fabric_dir": (checkout / "fabric").exists(),
"remote_origin": _git_value(checkout, "config", "--get", "remote.origin.url") or source.get("remote_url", ""),
"head": _git_value(checkout, "rev-parse", "HEAD") or "",
"branch": _git_value(checkout, "rev-parse", "--abbrev-ref", "HEAD") or "",
}
return [
_evidence_item(
root,
evidence_type="repository_checkout",
state="observed",
source={"path": _display_path(checkout), "repo_slug": source.get("repo_slug", "")},
summary=f"Repository checkout observed at {_display_path(checkout)}.",
attributes={key: value for key, value in attributes.items() if value not in ("", None)},
)
]
def _glob_root_evidence(root: dict[str, Any], evidence_type: str, *, max_items: int) -> list[dict[str, Any]]:
source = _source(root)
base = _resolve_path(source.get("path") or ".")
patterns = source.get("patterns") if isinstance(source.get("patterns"), list) else ["*"]
if not base.exists():
return [_declared_evidence(root, f"{evidence_type}_missing", "unavailable", f"Root path missing: {base}")]
matches: list[Path] = []
for pattern in patterns:
matches.extend(sorted(base.glob(str(pattern))))
if len(matches) >= max_items:
break
evidence = [
_evidence_item(
root,
evidence_type=evidence_type,
state="observed",
source={"path": _display_path(path)},
summary=f"Observed {evidence_type} at {_display_path(path)}.",
attributes=_file_attributes(path),
)
for path in matches[:max_items]
]
if not evidence:
evidence.append(_declared_evidence(root, f"{evidence_type}_empty", "unavailable", f"No files matched under {base}."))
if len(matches) > max_items:
evidence.append(_declared_evidence(root, f"{evidence_type}_truncated", "skipped", f"Skipped {len(matches) - max_items} matches beyond max_items_per_root."))
return evidence
def _state_hub_evidence(root: dict[str, Any], *, include_remote: bool) -> list[dict[str, Any]]:
source = _source(root)
if not include_remote:
return [_declared_evidence(root, "state_hub_repo_inventory", "declared", "State Hub repo inventory root declared; remote fetch disabled.")]
base_url = str(source.get("base_url") or "").rstrip("/")
evidence: list[dict[str, Any]] = []
for api_path in source.get("api_paths") or ["/managed-repos/"]:
url = f"{base_url}{api_path}"
try:
with urllib.request.urlopen(url, timeout=5) as response:
payload = json.loads(response.read())
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
evidence.append(_declared_evidence(root, "state_hub_fetch_failed", "unavailable", f"{url}: {exc}"))
continue
count = len(payload) if isinstance(payload, list) else len(payload.get("items", [])) if isinstance(payload, dict) else 0
evidence.append(
_evidence_item(
root,
evidence_type="state_hub_repo_inventory",
state="observed",
source={"url": url},
summary=f"Fetched State Hub repository inventory from {url}.",
attributes={"item_count": count, "payload_fingerprint": short_fingerprint(payload)},
)
)
return evidence
def _metadata_root_evidence(root: dict[str, Any]) -> list[dict[str, Any]]:
source = _source(root)
path = source.get("path")
if path:
resolved = _resolve_path(path)
if resolved.exists():
return [_file_evidence(root, resolved, str(root.get("type") or "metadata_root"))]
return [_declared_evidence(root, str(root.get("type") or "metadata_root"), "planned" if root.get("status") == "planned" else "declared", "Metadata-only root declared.")]
def _file_evidence(root: dict[str, Any], path: Path, evidence_type: str, *, summary: str | None = None) -> dict[str, Any]:
return _evidence_item(
root,
evidence_type=evidence_type,
state="observed",
source={"path": _display_path(path)},
summary=summary or f"Observed {evidence_type} file at {_display_path(path)}.",
attributes=_file_attributes(path),
)
def _declared_evidence(root: dict[str, Any], evidence_type: str, state: str, summary: str) -> dict[str, Any]:
source = _source(root)
return _evidence_item(
root,
evidence_type=evidence_type,
state=state,
source={key: value for key, value in source.items() if key != "safe_discovery"},
summary=summary,
attributes={"safe_discovery": source.get("safe_discovery", "metadata_only")},
)
def _evidence_item(
root: dict[str, Any],
*,
evidence_type: str,
state: str,
source: dict[str, Any],
summary: str,
attributes: dict[str, Any] | None = None,
) -> dict[str, Any]:
payload = {
"root_id": root.get("id", ""),
"evidence_type": evidence_type,
"state": state,
"source": source,
"summary": summary,
"attributes": attributes or {},
}
fingerprint = short_fingerprint(payload, length=16)
return {
"id": f"evidence:{root.get('id', 'root')}:{fingerprint}",
"root_id": root.get("id", ""),
"evidence_type": evidence_type,
"state": state,
"durable": True,
"live_telemetry": False,
"source": source,
"provenance": {
"extractor_id": "accountability-root-adapter",
"extractor_version": EXTRACTOR_VERSION,
"method": "deterministic",
"origin": "deterministic",
},
"fingerprint": fingerprint,
"summary": summary,
"attributes": attributes or {},
}
def _review_artifact(root: dict[str, Any], artifact_type: str, severity: str, message: str) -> dict[str, Any]:
return {
"root_id": root.get("id", ""),
"artifact_type": artifact_type,
"severity": severity,
"message": message,
"source": _source(root),
}
def _source(root: dict[str, Any]) -> dict[str, Any]:
source = root.get("source")
return source if isinstance(source, dict) else {}
def _resolve_path(value: object) -> Path:
path = Path(str(value or "."))
return path if path.is_absolute() else repo_root() / path
def _display_path(path: Path) -> str:
try:
return path.resolve().relative_to(repo_root()).as_posix()
except ValueError:
return str(path.resolve())
def _file_attributes(path: Path) -> dict[str, Any]:
attributes: dict[str, Any] = {
"path_type": "directory" if path.is_dir() else "file",
"exists": path.exists(),
}
if path.is_file():
attributes["size_bytes"] = path.stat().st_size
attributes["sha256"] = _file_sha256(path)
return attributes
def _file_sha256(path: Path) -> str | None:
if not path.is_file():
return None
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def _git_value(repo_path: Path, *args: str) -> str | None:
try:
result = subprocess.run(
["git", *args],
cwd=repo_path,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
check=False,
timeout=5,
)
except (OSError, subprocess.SubprocessError):
return None
value = result.stdout.strip()
return value or None
def _unique_strings(values: list[object]) -> list[str]:
result: list[str] = []
seen: set[str] = set()
for value in values:
text = str(value or "").strip()
if not text or text in seen:
continue
result.append(text)
seen.add(text)
return result
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")