generated from coulomb/repo-seed
506 lines
17 KiB
Python
506 lines
17 KiB
Python
import json
|
|
from pathlib import Path
|
|
|
|
from railiance_fabric.accountability_roots import (
|
|
AccountabilityEvidenceStore,
|
|
build_identity_projection,
|
|
build_ownership_review,
|
|
build_update_delta,
|
|
collect_accountability_root_evidence,
|
|
load_accountability_root_manifest,
|
|
)
|
|
from railiance_fabric.cli import main as cli_main
|
|
from railiance_fabric.schema_validation import draft202012_validator
|
|
|
|
|
|
def test_collect_accountability_root_evidence_from_manifest(tmp_path: Path) -> None:
|
|
manifest = _fixture_manifest(tmp_path)
|
|
evidence_run = collect_accountability_root_evidence(manifest, max_items_per_root=20)
|
|
|
|
validator = draft202012_validator(Path("schemas/accountability-root-evidence.schema.yaml"))
|
|
assert list(validator.iter_errors(evidence_run)) == []
|
|
assert evidence_run["kind"] == "AccountabilityRootEvidenceRun"
|
|
assert evidence_run["manifest"]["id"] == "fixture.accountability-roots"
|
|
|
|
evidence = [
|
|
item
|
|
for root in evidence_run["roots"]
|
|
for item in root["evidence"]
|
|
]
|
|
evidence_types = {item["evidence_type"] for item in evidence}
|
|
assert {"registered_repository", "repository_checkout", "deployment_automation", "secret_root"} <= evidence_types
|
|
assert all(item["durable"] is True for item in evidence)
|
|
assert all(item["live_telemetry"] is False for item in evidence)
|
|
|
|
registered_repo = next(item for item in evidence if item["evidence_type"] == "registered_repository")
|
|
assert registered_repo["attributes"]["state_hub_repo_id"] == "fixture-state-hub-id"
|
|
|
|
secret_root = next(item for item in evidence if item["evidence_type"] == "secret_root")
|
|
assert "secret-value" not in json.dumps(secret_root)
|
|
|
|
|
|
def test_identity_projection_is_stable_and_reviewable(tmp_path: Path) -> None:
|
|
manifest_path = _fixture_manifest(tmp_path)
|
|
manifest = load_accountability_root_manifest(manifest_path)
|
|
|
|
first = build_identity_projection(collect_accountability_root_evidence(manifest_path), manifest)
|
|
second = build_identity_projection(collect_accountability_root_evidence(manifest_path), manifest)
|
|
|
|
validator = draft202012_validator(Path("schemas/accountability-identity-projection.schema.yaml"))
|
|
assert list(validator.iter_errors(first)) == []
|
|
|
|
first_keys = {candidate["stable_key"] for candidate in first["identity_candidates"]}
|
|
second_keys = {candidate["stable_key"] for candidate in second["identity_candidates"]}
|
|
assert first_keys == second_keys
|
|
assert {
|
|
"Actor",
|
|
"Fabric",
|
|
"Repository",
|
|
"Deployable",
|
|
"SecretRoot",
|
|
} <= {candidate["identity_type"] for candidate in first["identity_candidates"]}
|
|
assert first["candidate_graph"]["nodes"]
|
|
assert first["candidate_graph"]["edges"]
|
|
|
|
|
|
def test_deployable_identity_ignores_generic_filename_alias_ambiguity(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
for name in ("service-a", "service-b"):
|
|
service = workspace / name
|
|
service.mkdir(parents=True)
|
|
(service / "Dockerfile").write_text("FROM python:3.12-slim\n", encoding="utf-8")
|
|
manifest_path = _minimal_manifest(
|
|
tmp_path,
|
|
"""
|
|
- id: root.fixture.deployables
|
|
type: deployment_automation
|
|
status: active
|
|
fabric_id: fabric.fixture.primary
|
|
owner_actor_id: actor.fixture.lord
|
|
source:
|
|
path: {workspace}
|
|
patterns:
|
|
- "*/Dockerfile"
|
|
safe_discovery: local_files
|
|
evidence_scope:
|
|
- deployment_topology
|
|
""".format(workspace=workspace),
|
|
)
|
|
manifest = load_accountability_root_manifest(manifest_path)
|
|
|
|
projection = build_identity_projection(collect_accountability_root_evidence(manifest_path), manifest)
|
|
|
|
deployables = [
|
|
candidate
|
|
for candidate in projection["identity_candidates"]
|
|
if candidate["identity_type"] == "Deployable"
|
|
]
|
|
assert len(deployables) == 2
|
|
assert all("ambiguous_aliases" not in candidate.get("attributes", {}) for candidate in deployables)
|
|
|
|
|
|
def test_deployment_evidence_skips_dependency_cache_noise(tmp_path: Path) -> None:
|
|
workspace = tmp_path / "workspace"
|
|
(workspace / "service").mkdir(parents=True)
|
|
(workspace / "service" / "Dockerfile").write_text("FROM python:3.12-slim\n", encoding="utf-8")
|
|
(workspace / ".venv" / "lib" / "python3.12" / "site-packages" / "pkg").mkdir(parents=True)
|
|
(workspace / ".venv" / "lib" / "python3.12" / "site-packages" / "pkg" / "Dockerfile").write_text(
|
|
"FROM ignored\n",
|
|
encoding="utf-8",
|
|
)
|
|
(workspace / "go" / "pkg" / "mod" / "example").mkdir(parents=True)
|
|
(workspace / "go" / "pkg" / "mod" / "example" / "Dockerfile").write_text(
|
|
"FROM ignored\n",
|
|
encoding="utf-8",
|
|
)
|
|
manifest_path = _minimal_manifest(
|
|
tmp_path,
|
|
"""
|
|
- id: root.fixture.deployables
|
|
type: deployment_automation
|
|
status: active
|
|
fabric_id: fabric.fixture.primary
|
|
owner_actor_id: actor.fixture.lord
|
|
source:
|
|
path: {workspace}
|
|
patterns:
|
|
- "**/Dockerfile"
|
|
safe_discovery: local_files
|
|
evidence_scope:
|
|
- deployment_topology
|
|
""".format(workspace=workspace),
|
|
)
|
|
|
|
evidence = collect_accountability_root_evidence(manifest_path, max_items_per_root=20)
|
|
paths = [
|
|
item["source"]["path"]
|
|
for root in evidence["roots"]
|
|
for item in root["evidence"]
|
|
if item["evidence_type"] == "deployment_automation"
|
|
]
|
|
|
|
assert paths == [str(workspace / "service" / "Dockerfile")]
|
|
|
|
|
|
def test_evidence_store_persists_runs_items_and_identities(tmp_path: Path) -> None:
|
|
manifest_path = _fixture_manifest(tmp_path)
|
|
manifest = load_accountability_root_manifest(manifest_path)
|
|
evidence_run = collect_accountability_root_evidence(manifest_path)
|
|
projection = build_identity_projection(evidence_run, manifest)
|
|
store = AccountabilityEvidenceStore(tmp_path / "evidence.sqlite3")
|
|
|
|
stored = store.add_evidence_run(evidence_run, projection)
|
|
latest = store.latest_run()
|
|
|
|
assert latest is not None
|
|
assert latest["id"] == stored["run_id"]
|
|
assert stored["evidence_count"] == len(store.list_evidence(stored["run_id"]))
|
|
assert stored["identity_candidate_count"] == len(store.list_identity_candidates(stored["run_id"]))
|
|
|
|
|
|
def test_ownership_review_flags_ambiguity_and_applies_review_decisions(tmp_path: Path) -> None:
|
|
manifest_path = _fixture_manifest(tmp_path)
|
|
manifest = load_accountability_root_manifest(manifest_path)
|
|
projection = build_identity_projection(collect_accountability_root_evidence(manifest_path), manifest)
|
|
review = build_ownership_review(projection, manifest)
|
|
|
|
validator = draft202012_validator(Path("schemas/accountability-ownership-review.schema.yaml"))
|
|
assert list(validator.iter_errors(review)) == []
|
|
|
|
repo_key = "identity:repository:fixture-repo"
|
|
repo_item = next(item for item in review["items"] if item["stable_key"] == repo_key)
|
|
assert repo_item["review_state"] == "needs_review"
|
|
assert "ambiguous_ownership" in repo_item["blockers"]
|
|
|
|
store = AccountabilityEvidenceStore(tmp_path / "evidence.sqlite3")
|
|
store.add_review_decision(
|
|
stable_key=repo_key,
|
|
decision="accept",
|
|
reviewer="tester",
|
|
owner_actor_id="actor.fixture.lord",
|
|
fabric_id="fabric.fixture.primary",
|
|
note="fixture checkout owner wins over registry root",
|
|
)
|
|
accepted_review = build_ownership_review(
|
|
projection,
|
|
manifest,
|
|
review_decisions=store.latest_review_decisions(),
|
|
)
|
|
accepted_item = next(item for item in accepted_review["items"] if item["stable_key"] == repo_key)
|
|
assert accepted_item["review_state"] == "accepted"
|
|
assert accepted_item["ownership"]["resolution"] == "review_decision"
|
|
assert accepted_item["ownership"]["owner_actor_id"] == "actor.fixture.lord"
|
|
|
|
|
|
def test_discover_roots_cli_prints_evidence_json(tmp_path: Path, capsys) -> None:
|
|
manifest = _fixture_manifest(tmp_path)
|
|
|
|
assert cli_main(["discover-roots", "--manifest", str(manifest), "--max-items-per-root", "20"]) == 0
|
|
|
|
payload = json.loads(capsys.readouterr().out)
|
|
assert payload["kind"] == "AccountabilityRootEvidenceRun"
|
|
assert payload["roots"]
|
|
|
|
|
|
def test_discover_roots_cli_can_print_identities_and_store(tmp_path: Path, capsys) -> None:
|
|
manifest = _fixture_manifest(tmp_path)
|
|
store_path = tmp_path / "evidence.sqlite3"
|
|
|
|
assert (
|
|
cli_main(
|
|
[
|
|
"discover-roots",
|
|
"--manifest",
|
|
str(manifest),
|
|
"--identity-projection",
|
|
"--store-db",
|
|
str(store_path),
|
|
]
|
|
)
|
|
== 0
|
|
)
|
|
|
|
payload = json.loads(capsys.readouterr().out)
|
|
assert payload["kind"] == "AccountabilityIdentityProjection"
|
|
assert AccountabilityEvidenceStore(store_path).latest_run() is not None
|
|
|
|
|
|
def test_review_identity_cli_persists_decision_for_ownership_review(tmp_path: Path, capsys) -> None:
|
|
manifest = _fixture_manifest(tmp_path)
|
|
store_path = tmp_path / "evidence.sqlite3"
|
|
repo_key = "identity:repository:fixture-repo"
|
|
|
|
assert (
|
|
cli_main(
|
|
[
|
|
"review-identity",
|
|
repo_key,
|
|
"--store-db",
|
|
str(store_path),
|
|
"--decision",
|
|
"accept",
|
|
"--owner-actor-id",
|
|
"actor.fixture.lord",
|
|
"--fabric-id",
|
|
"fabric.fixture.primary",
|
|
"--reviewer",
|
|
"tester",
|
|
]
|
|
)
|
|
== 0
|
|
)
|
|
decision_payload = json.loads(capsys.readouterr().out)
|
|
assert decision_payload["stable_key"] == repo_key
|
|
|
|
assert (
|
|
cli_main(
|
|
[
|
|
"discover-roots",
|
|
"--manifest",
|
|
str(manifest),
|
|
"--ownership-review",
|
|
"--store-db",
|
|
str(store_path),
|
|
]
|
|
)
|
|
== 0
|
|
)
|
|
review_payload = json.loads(capsys.readouterr().out)
|
|
repo_item = next(item for item in review_payload["items"] if item["stable_key"] == repo_key)
|
|
assert repo_item["review_state"] == "accepted"
|
|
assert repo_item["decision"]["reviewer"] == "tester"
|
|
|
|
|
|
def test_update_delta_detects_ownership_changes_and_unchanged_runs(tmp_path: Path) -> None:
|
|
manifest_path = _fixture_manifest(tmp_path)
|
|
manifest = load_accountability_root_manifest(manifest_path)
|
|
projection = build_identity_projection(collect_accountability_root_evidence(manifest_path), manifest)
|
|
review = build_ownership_review(projection, manifest)
|
|
|
|
unchanged = build_update_delta(
|
|
projection,
|
|
review,
|
|
previous_identity_projection=projection,
|
|
previous_ownership_review=review,
|
|
)
|
|
validator = draft202012_validator(Path("schemas/accountability-update-delta.schema.yaml"))
|
|
assert list(validator.iter_errors(unchanged)) == []
|
|
assert unchanged["summary"]["promotion_needed"] is False
|
|
assert unchanged["node_delta"]["unchanged"]
|
|
|
|
store = AccountabilityEvidenceStore(tmp_path / "evidence.sqlite3")
|
|
store.add_review_decision(
|
|
stable_key="identity:repository:fixture-repo",
|
|
decision="accept",
|
|
reviewer="tester",
|
|
owner_actor_id="actor.fixture.lord",
|
|
fabric_id="fabric.fixture.primary",
|
|
)
|
|
accepted_review = build_ownership_review(
|
|
projection,
|
|
manifest,
|
|
review_decisions=store.latest_review_decisions(),
|
|
)
|
|
changed = build_update_delta(
|
|
projection,
|
|
accepted_review,
|
|
previous_identity_projection=projection,
|
|
previous_ownership_review=review,
|
|
)
|
|
assert changed["summary"]["promotion_needed"] is True
|
|
assert "identity:repository:fixture-repo" in changed["change_sets"]["ownership"]
|
|
assert "identity:repository:fixture-repo" in changed["change_sets"]["review_state"]
|
|
|
|
|
|
def test_discover_roots_cli_can_emit_delta(tmp_path: Path, capsys) -> None:
|
|
manifest = _fixture_manifest(tmp_path)
|
|
manifest_data = load_accountability_root_manifest(manifest)
|
|
projection = build_identity_projection(collect_accountability_root_evidence(manifest), manifest_data)
|
|
review = build_ownership_review(projection, manifest_data)
|
|
projection_path = tmp_path / "previous-identities.json"
|
|
review_path = tmp_path / "previous-ownership.json"
|
|
projection_path.write_text(json.dumps(projection), encoding="utf-8")
|
|
review_path.write_text(json.dumps(review), encoding="utf-8")
|
|
|
|
assert (
|
|
cli_main(
|
|
[
|
|
"discover-roots",
|
|
"--manifest",
|
|
str(manifest),
|
|
"--delta",
|
|
"--previous-identity-projection",
|
|
str(projection_path),
|
|
"--previous-ownership-review",
|
|
str(review_path),
|
|
]
|
|
)
|
|
== 0
|
|
)
|
|
|
|
payload = json.loads(capsys.readouterr().out)
|
|
assert payload["kind"] == "AccountabilityUpdateDelta"
|
|
assert payload["summary"]["promotion_needed"] is False
|
|
|
|
|
|
def _fixture_manifest(tmp_path: Path) -> Path:
|
|
workspace = tmp_path / "workspace"
|
|
repo = workspace / "fixture-repo"
|
|
repo.mkdir(parents=True)
|
|
(repo / ".git").mkdir()
|
|
(repo / "fabric").mkdir()
|
|
(repo / "Dockerfile").write_text("FROM python:3.12-slim\n", encoding="utf-8")
|
|
(repo / "compose.yaml").write_text("services:\n api:\n image: fixture/api\n", encoding="utf-8")
|
|
secret_metadata = repo / "secret-root.txt"
|
|
secret_metadata.write_text("secret-value-never-promote\n", encoding="utf-8")
|
|
|
|
registry_manifest = tmp_path / "local-repos.yaml"
|
|
registry_manifest.write_text(
|
|
"""
|
|
apiVersion: railiance.fabric/v1alpha1
|
|
kind: RegistryOnboardingManifest
|
|
repositories:
|
|
- slug: fixture-repo
|
|
name: Fixture Repo
|
|
domain: testing
|
|
path: {repo}
|
|
default_branch: main
|
|
state_hub_repo_id: fixture-state-hub-id
|
|
remote_url: gitea-remote:coulomb/fixture-repo.git
|
|
""".format(repo=repo),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
manifest = tmp_path / "accountability-roots.yaml"
|
|
manifest.write_text(
|
|
"""
|
|
apiVersion: railiance.fabric/v1alpha2
|
|
kind: AccountabilityRootManifest
|
|
metadata:
|
|
id: fixture.accountability-roots
|
|
name: Fixture Accountability Roots
|
|
netkingdom:
|
|
id: fixture.netkingdom
|
|
name: Fixture Netkingdom
|
|
king_actor_id: actor.fixture.king
|
|
actors:
|
|
- id: actor.fixture.king
|
|
role: king
|
|
name: Fixture King
|
|
- id: actor.fixture.lord
|
|
role: lord
|
|
name: Fixture Lord
|
|
fabrics:
|
|
- id: fabric.fixture.primary
|
|
kind: Fabric
|
|
name: Fixture Primary Fabric
|
|
netkingdom_id: fixture.netkingdom
|
|
lord_actor_id: actor.fixture.lord
|
|
parent_fabric_id: null
|
|
status: active
|
|
boundary:
|
|
boundary_type: fabric
|
|
criterion: financial_and_operational_accountability
|
|
discovery_roots:
|
|
- id: root.fixture.registry
|
|
type: registry_manifest
|
|
status: active
|
|
fabric_id: fabric.fixture.primary
|
|
owner_actor_id: actor.fixture.king
|
|
source:
|
|
manifest_path: {registry_manifest}
|
|
safe_discovery: local_files
|
|
evidence_scope:
|
|
- repo_inventory
|
|
- repository_identity
|
|
- id: root.fixture.checkout
|
|
type: repository_checkout
|
|
status: active
|
|
fabric_id: fabric.fixture.primary
|
|
owner_actor_id: actor.fixture.lord
|
|
source:
|
|
repo_slug: fixture-repo
|
|
path: {repo}
|
|
safe_discovery: local_files
|
|
evidence_scope:
|
|
- repository_identity
|
|
- local_checkout
|
|
- id: root.fixture.deployment
|
|
type: deployment_automation
|
|
status: active
|
|
fabric_id: fabric.fixture.primary
|
|
owner_actor_id: actor.fixture.lord
|
|
source:
|
|
path: {repo}
|
|
patterns:
|
|
- Dockerfile
|
|
- compose.yaml
|
|
safe_discovery: local_files
|
|
evidence_scope:
|
|
- deployment_topology
|
|
- id: root.fixture.secret
|
|
type: secret_root
|
|
status: planned
|
|
fabric_id: fabric.fixture.primary
|
|
owner_actor_id: actor.fixture.king
|
|
source:
|
|
path: {secret_metadata}
|
|
safe_discovery: metadata_only
|
|
evidence_scope:
|
|
- secret_metadata
|
|
refresh:
|
|
cadence: manual
|
|
triggers:
|
|
- operator_request
|
|
""".format(
|
|
registry_manifest=registry_manifest,
|
|
repo=repo,
|
|
secret_metadata=secret_metadata,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
return manifest
|
|
|
|
|
|
def _minimal_manifest(tmp_path: Path, discovery_roots: str) -> Path:
|
|
manifest = tmp_path / "minimal-accountability-roots.yaml"
|
|
manifest.write_text(
|
|
"""
|
|
apiVersion: railiance.fabric/v1alpha2
|
|
kind: AccountabilityRootManifest
|
|
metadata:
|
|
id: fixture.minimal-accountability-roots
|
|
name: Fixture Minimal Accountability Roots
|
|
netkingdom:
|
|
id: fixture.netkingdom
|
|
name: Fixture Netkingdom
|
|
king_actor_id: actor.fixture.king
|
|
actors:
|
|
- id: actor.fixture.king
|
|
role: king
|
|
name: Fixture King
|
|
- id: actor.fixture.lord
|
|
role: lord
|
|
name: Fixture Lord
|
|
fabrics:
|
|
- id: fabric.fixture.primary
|
|
kind: Fabric
|
|
name: Fixture Primary Fabric
|
|
netkingdom_id: fixture.netkingdom
|
|
lord_actor_id: actor.fixture.lord
|
|
parent_fabric_id: null
|
|
status: active
|
|
boundary:
|
|
boundary_type: fabric
|
|
criterion: financial_and_operational_accountability
|
|
discovery_roots:
|
|
{discovery_roots}
|
|
refresh:
|
|
cadence: manual
|
|
triggers:
|
|
- operator_request
|
|
""".format(discovery_roots=discovery_roots.rstrip()),
|
|
encoding="utf-8",
|
|
)
|
|
return manifest
|