generated from coulomb/repo-seed
150 lines
5.1 KiB
Python
150 lines
5.1 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
from railiance_fabric.cli import main as cli_main
|
|
from railiance_fabric.connectors import CONNECTOR_TYPES, ConnectorConfig
|
|
from railiance_fabric.scanner import ScanOptions, scan_repo
|
|
from railiance_fabric.schema_validation import draft202012_validator
|
|
|
|
|
|
def test_connector_interfaces_cover_followup_catalog_types() -> None:
|
|
assert set(CONNECTOR_TYPES) >= {
|
|
"package_registry",
|
|
"container_registry",
|
|
"api_catalog",
|
|
"service_catalog",
|
|
"deployment_inventory",
|
|
"fabric_registry",
|
|
}
|
|
|
|
|
|
def test_local_fabric_registry_connector_adds_separate_registry_evidence(tmp_path: Path) -> None:
|
|
repo = _minimal_repo(tmp_path)
|
|
manifest = _manifest(tmp_path)
|
|
|
|
snapshot = scan_repo(
|
|
ScanOptions(
|
|
repo_path=repo,
|
|
repo_slug="fixture-repo",
|
|
repo_name="Fixture Repo",
|
|
commit="abc123",
|
|
connectors=[
|
|
ConnectorConfig(
|
|
connector_id="local-fabric-registry",
|
|
connector_type="fabric_registry",
|
|
source_path=str(manifest),
|
|
)
|
|
],
|
|
)
|
|
)
|
|
|
|
_validate_schema("discovery-snapshot.schema.yaml", snapshot)
|
|
assert snapshot["connector_runs"][0]["status"] == "success"
|
|
assert snapshot["connector_runs"][0]["candidate_counts"] == {"nodes": 1, "edges": 1, "attributes": 8}
|
|
assert any(scope["source_kind"] == "fabric_registry" for scope in snapshot["replacement_scopes"])
|
|
|
|
registry_node = next(node for node in snapshot["candidates"]["nodes"] if node["kind"] == "FabricRegistryEntry")
|
|
assert registry_node["origin"] == "registry"
|
|
assert registry_node["review_state"] == "candidate"
|
|
assert registry_node["replacement_scope"].startswith("scope:fixture-repo:local-fabric-registry:fabric_registry")
|
|
assert registry_node["attributes"]["state_hub_repo_id"] == "state-hub-id"
|
|
assert any(edge["edge_type"] == "cataloged_as" for edge in snapshot["candidates"]["edges"])
|
|
registry_attributes = {
|
|
attribute["name"]: attribute
|
|
for attribute in snapshot["candidates"]["attributes"]
|
|
if attribute["origin"] == "registry"
|
|
}
|
|
assert registry_attributes["registry_remote_url"]["value"] == "gitea-remote:coulomb/fixture-repo.git"
|
|
assert registry_attributes["registry_default_branch"]["source_anchors"][0]["source_kind"] == "fabric_registry"
|
|
|
|
|
|
def test_connector_failure_becomes_review_artifact_without_corrupting_scan(tmp_path: Path) -> None:
|
|
repo = _minimal_repo(tmp_path)
|
|
snapshot = scan_repo(
|
|
ScanOptions(
|
|
repo_path=repo,
|
|
repo_slug="fixture-repo",
|
|
repo_name="Fixture Repo",
|
|
commit="abc123",
|
|
connectors=[
|
|
ConnectorConfig(
|
|
connector_id="local-fabric-registry",
|
|
connector_type="fabric_registry",
|
|
source_path=str(tmp_path / "missing.yaml"),
|
|
)
|
|
],
|
|
)
|
|
)
|
|
|
|
_validate_schema("discovery-snapshot.schema.yaml", snapshot)
|
|
assert snapshot["connector_runs"][0]["status"] == "unavailable"
|
|
assert snapshot["review_artifacts"][0]["artifact_type"] == "connector_unavailable"
|
|
assert any(node["kind"] == "Repository" for node in snapshot["candidates"]["nodes"])
|
|
|
|
|
|
def test_scan_cli_can_enable_local_fabric_registry_connector(tmp_path: Path, capsys) -> None:
|
|
repo = _minimal_repo(tmp_path)
|
|
manifest = _manifest(tmp_path)
|
|
output = tmp_path / "snapshot.json"
|
|
|
|
assert cli_main(
|
|
[
|
|
"scan",
|
|
str(repo),
|
|
"--repo-slug",
|
|
"fixture-repo",
|
|
"--repo-name",
|
|
"Fixture Repo",
|
|
"--commit",
|
|
"abc123",
|
|
"--connector",
|
|
"local-fabric-registry",
|
|
"--connector-manifest",
|
|
str(manifest),
|
|
"--output",
|
|
str(output),
|
|
]
|
|
) == 0
|
|
|
|
summary = capsys.readouterr().out
|
|
assert "1 connector run(s)" in summary
|
|
payload = json.loads(output.read_text(encoding="utf-8"))
|
|
_validate_schema("discovery-snapshot.schema.yaml", payload)
|
|
assert payload["connector_runs"][0]["status"] == "success"
|
|
|
|
|
|
def _minimal_repo(tmp_path: Path) -> Path:
|
|
repo = tmp_path / "fixture-repo"
|
|
repo.mkdir()
|
|
(repo / "README.md").write_text("# Fixture Repo\n", encoding="utf-8")
|
|
return repo
|
|
|
|
|
|
def _manifest(tmp_path: Path) -> Path:
|
|
manifest = tmp_path / "local-repos.yaml"
|
|
manifest.write_text(
|
|
"""
|
|
apiVersion: railiance.fabric/v1alpha1
|
|
kind: RegistryOnboardingManifest
|
|
repositories:
|
|
- slug: fixture-repo
|
|
name: Fixture Repo
|
|
domain: testing
|
|
path: /tmp/fixture-repo
|
|
default_branch: main
|
|
state_hub_repo_id: state-hub-id
|
|
remote_url: gitea-remote:coulomb/fixture-repo.git
|
|
declaration_paths:
|
|
- /tmp/fixture-repo
|
|
""".lstrip(),
|
|
encoding="utf-8",
|
|
)
|
|
return manifest
|
|
|
|
|
|
def _validate_schema(schema_name: str, payload: dict[str, object]) -> None:
|
|
validator = draft202012_validator(Path("schemas") / schema_name)
|
|
validator.validate(payload)
|