Files
railiance-fabric/tests/test_connectors.py

150 lines
5.1 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from railiance_fabric.cli import main as cli_main
from railiance_fabric.connectors import CONNECTOR_TYPES, ConnectorConfig
from railiance_fabric.scanner import ScanOptions, scan_repo
from railiance_fabric.schema_validation import draft202012_validator
def test_connector_interfaces_cover_followup_catalog_types() -> None:
assert set(CONNECTOR_TYPES) >= {
"package_registry",
"container_registry",
"api_catalog",
"service_catalog",
"deployment_inventory",
"fabric_registry",
}
def test_local_fabric_registry_connector_adds_separate_registry_evidence(tmp_path: Path) -> None:
repo = _minimal_repo(tmp_path)
manifest = _manifest(tmp_path)
snapshot = scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="abc123",
connectors=[
ConnectorConfig(
connector_id="local-fabric-registry",
connector_type="fabric_registry",
source_path=str(manifest),
)
],
)
)
_validate_schema("discovery-snapshot.schema.yaml", snapshot)
assert snapshot["connector_runs"][0]["status"] == "success"
assert snapshot["connector_runs"][0]["candidate_counts"] == {"nodes": 1, "edges": 1, "attributes": 8}
assert any(scope["source_kind"] == "fabric_registry" for scope in snapshot["replacement_scopes"])
registry_node = next(node for node in snapshot["candidates"]["nodes"] if node["kind"] == "FabricRegistryEntry")
assert registry_node["origin"] == "registry"
assert registry_node["review_state"] == "candidate"
assert registry_node["replacement_scope"].startswith("scope:fixture-repo:local-fabric-registry:fabric_registry")
assert registry_node["attributes"]["state_hub_repo_id"] == "state-hub-id"
assert any(edge["edge_type"] == "cataloged_as" for edge in snapshot["candidates"]["edges"])
registry_attributes = {
attribute["name"]: attribute
for attribute in snapshot["candidates"]["attributes"]
if attribute["origin"] == "registry"
}
assert registry_attributes["registry_remote_url"]["value"] == "gitea-remote:coulomb/fixture-repo.git"
assert registry_attributes["registry_default_branch"]["source_anchors"][0]["source_kind"] == "fabric_registry"
def test_connector_failure_becomes_review_artifact_without_corrupting_scan(tmp_path: Path) -> None:
repo = _minimal_repo(tmp_path)
snapshot = scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="abc123",
connectors=[
ConnectorConfig(
connector_id="local-fabric-registry",
connector_type="fabric_registry",
source_path=str(tmp_path / "missing.yaml"),
)
],
)
)
_validate_schema("discovery-snapshot.schema.yaml", snapshot)
assert snapshot["connector_runs"][0]["status"] == "unavailable"
assert snapshot["review_artifacts"][0]["artifact_type"] == "connector_unavailable"
assert any(node["kind"] == "Repository" for node in snapshot["candidates"]["nodes"])
def test_scan_cli_can_enable_local_fabric_registry_connector(tmp_path: Path, capsys) -> None:
repo = _minimal_repo(tmp_path)
manifest = _manifest(tmp_path)
output = tmp_path / "snapshot.json"
assert cli_main(
[
"scan",
str(repo),
"--repo-slug",
"fixture-repo",
"--repo-name",
"Fixture Repo",
"--commit",
"abc123",
"--connector",
"local-fabric-registry",
"--connector-manifest",
str(manifest),
"--output",
str(output),
]
) == 0
summary = capsys.readouterr().out
assert "1 connector run(s)" in summary
payload = json.loads(output.read_text(encoding="utf-8"))
_validate_schema("discovery-snapshot.schema.yaml", payload)
assert payload["connector_runs"][0]["status"] == "success"
def _minimal_repo(tmp_path: Path) -> Path:
repo = tmp_path / "fixture-repo"
repo.mkdir()
(repo / "README.md").write_text("# Fixture Repo\n", encoding="utf-8")
return repo
def _manifest(tmp_path: Path) -> Path:
manifest = tmp_path / "local-repos.yaml"
manifest.write_text(
"""
apiVersion: railiance.fabric/v1alpha1
kind: RegistryOnboardingManifest
repositories:
- slug: fixture-repo
name: Fixture Repo
domain: testing
path: /tmp/fixture-repo
default_branch: main
state_hub_repo_id: state-hub-id
remote_url: gitea-remote:coulomb/fixture-repo.git
declaration_paths:
- /tmp/fixture-repo
""".lstrip(),
encoding="utf-8",
)
return manifest
def _validate_schema(schema_name: str, payload: dict[str, object]) -> None:
validator = draft202012_validator(Path("schemas") / schema_name)
validator.validate(payload)