Files
hub-core/tests/test_imports.py
tegwick 986ac4d40b Add hub-core package, docs, and State Hub integration scaffold
Extract the first reusable slice (models, schemas, routers, MCP, migrations)
from state-hub with INTENT/SCOPE, agent instructions, workplan, and aligned
inter_hub capability registry index.
2026-06-16 02:39:36 +02:00

392 lines
12 KiB
Python

import uuid
from datetime import datetime, timezone
from hub_core.events import ALERT_EVENT_TYPES, FOS10_EVENT_TYPES, RISK_EVENT_TYPES
from hub_core.models import Base
from hub_core.models.agent_message import AgentMessage
from hub_core.models.capability_catalog import CapabilityCatalog
from hub_core.models.capability_request import CapabilityRequest
from hub_core.models.domain import Domain
from hub_core.models.managed_repo import ManagedRepo
from hub_core.models.progress_event import ProgressEvent
from hub_core.models.tpsc import TPSCCatalog, TPSCEntry, TPSCSnapshot
from hub_core.routers import (
create_capabilities_router,
create_capability_catalog_router,
create_capability_request_read_router,
create_domains_router,
create_messages_router,
create_policy_router,
create_progress_router,
create_repos_router,
create_tpsc_router,
)
from hub_core.schemas.capability import (
CapabilityRequestRead,
CatalogCreate,
CatalogPatch,
CatalogRead,
)
from hub_core.schemas.domain import (
DomainCreate,
DomainDetail,
DomainRead,
DomainRename,
DomainUpdate,
RepoStub,
)
from hub_core.schemas.doi import DoICriterion, DoIReport, DoISummaryEntry
from hub_core.schemas.managed_repo import (
RepoCreate,
RepoPathRegister,
RepoRead,
RepoUpdate,
)
from hub_core.schemas.policy import PolicyRead
from hub_core.schemas.progress_event import ProgressEventCreate, ProgressEventRead
from hub_core.schemas.tpsc import TPSCCatalogRead, TPSCGDPRReport, TPSCGDPRWarning
def test_core_tables_are_registered() -> None:
assert set(Base.metadata.tables) == {
"agent_messages",
"capability_catalog",
"capability_requests",
"domains",
"managed_repos",
"progress_events",
"tpsc_catalog",
"tpsc_entries",
"tpsc_snapshots",
}
def test_model_table_names() -> None:
assert AgentMessage.__tablename__ == "agent_messages"
assert CapabilityCatalog.__tablename__ == "capability_catalog"
assert CapabilityRequest.__tablename__ == "capability_requests"
assert Domain.__tablename__ == "domains"
assert ManagedRepo.__tablename__ == "managed_repos"
assert ProgressEvent.__tablename__ == "progress_events"
assert TPSCCatalog.__tablename__ == "tpsc_catalog"
assert TPSCEntry.__tablename__ == "tpsc_entries"
assert TPSCSnapshot.__tablename__ == "tpsc_snapshots"
def test_doi_schemas_are_available() -> None:
criterion = DoICriterion(id="C1", label="Canonical files", tier="core", status="pass")
report = DoIReport(
repo_slug="example",
tier="core",
core_pass=True,
standard_pass=False,
full_pass=False,
criteria=[criterion],
checked_at="2026-06-07T00:00:00+00:00",
)
summary = DoISummaryEntry(
repo_slug="example",
domain_slug="custodian",
tier="core",
core_pass=True,
standard_pass=False,
full_pass=False,
checked_at=report.checked_at,
)
assert report.criteria[0].id == "C1"
assert summary.domain_slug == "custodian"
def test_tpsc_schemas_match_state_hub_contract() -> None:
now = datetime.now(tz=timezone.utc)
catalog_entry = TPSCCatalogRead(
id=uuid.uuid4(),
slug="example-service",
name="Example Service",
provider="Example",
category="ops",
website_url=None,
pricing_model="paid",
gdpr_maturity="unknown",
gdpr_notes=None,
dpa_available=False,
tos_url=None,
privacy_policy_url=None,
data_processing_regions=None,
data_retention_notes=None,
status="active",
created_at=now,
updated_at=now,
)
warning = TPSCGDPRWarning(
repo_slug="state-hub",
service_slug="example-service",
gdpr_maturity="unknown",
purpose="testing",
pricing_model="paid",
)
report = TPSCGDPRReport(
generated_at=now,
total_services=1,
warning_count=1,
warnings=[warning],
by_maturity={"unknown": 1},
)
assert catalog_entry.gdpr_warning is True
assert report.warning_count == len(report.warnings)
assert report.by_maturity["unknown"] == 1
def test_router_factories_register_expected_prefixes() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
routers = [
create_capabilities_router(get_session),
create_domains_router(get_session),
create_messages_router(get_session),
create_repos_router(get_session),
create_progress_router(get_session),
create_tpsc_router(get_session),
create_policy_router(lambda name: None),
]
assert [router.prefix for router in routers] == [
"",
"/domains",
"/messages",
"/repos",
"/progress",
"/tpsc",
"/policy",
]
assert all(router.routes for router in routers)
def test_messages_router_accepts_host_model_injection() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
router = create_messages_router(get_session, message_model=AgentMessage)
assert router.prefix == "/messages"
assert router.routes
def test_domains_router_accepts_host_callbacks_and_schema_injection() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
async def detail_builder(domain, session):
raise AssertionError("router construction should not build details")
async def before_archive(domain, session):
raise AssertionError("router construction should not validate archive")
router = create_domains_router(
get_session,
domain_model=Domain,
repo_model=ManagedRepo,
domain_create_schema=DomainCreate,
domain_detail_schema=DomainDetail,
domain_read_schema=DomainRead,
domain_rename_schema=DomainRename,
domain_update_schema=DomainUpdate,
repo_stub_schema=RepoStub,
detail_builder=detail_builder,
before_archive=before_archive,
include_update_route=False,
)
method_paths = {
(method, route.path)
for route in router.routes
for method in getattr(route, "methods", set())
}
assert router.prefix == "/domains"
assert ("PATCH", "/domains/{slug}") not in method_paths
assert ("PATCH", "/domains/{slug}/rename") in method_paths
assert ("PATCH", "/domains/{slug}/archive") in method_paths
def test_repos_router_accepts_host_model_and_schema_injection() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
def after_register(repo, body, domain):
raise AssertionError("router construction should not call hooks")
router = create_repos_router(
get_session,
prefix="",
domain_model=Domain,
repo_model=ManagedRepo,
repo_create_schema=RepoCreate,
repo_update_schema=RepoUpdate,
repo_read_schema=RepoRead,
repo_path_register_schema=RepoPathRegister,
list_noload_fields=("domain",),
create_extension_fields=("topic_id",),
after_register=after_register,
include_slug_routes=False,
)
method_paths = {
(method, route.path)
for route in router.routes
for method in getattr(route, "methods", set())
}
assert router.prefix == ""
assert method_paths == {
("GET", "/"),
("POST", "/"),
("GET", "/by-fingerprint"),
("GET", "/by-remote"),
}
def test_repos_router_can_register_only_slug_routes() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
router = create_repos_router(
get_session,
prefix="",
include_collection_routes=False,
include_lookup_routes=False,
)
method_paths = {
(method, route.path)
for route in router.routes
for method in getattr(route, "methods", set())
}
assert method_paths == {
("GET", "/{slug}"),
("PATCH", "/{slug}"),
("POST", "/{slug}/paths"),
}
def test_capability_catalog_router_accepts_host_model_injection() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
router = create_capability_catalog_router(
get_session,
domain_model=Domain,
repo_model=ManagedRepo,
catalog_model=CapabilityCatalog,
catalog_create_schema=CatalogCreate,
catalog_patch_schema=CatalogPatch,
catalog_read_schema=CatalogRead,
)
method_paths = {
(method, route.path)
for route in router.routes
for method in getattr(route, "methods", set())
}
assert router.prefix == ""
assert method_paths == {
("GET", "/capability-catalog/"),
("POST", "/capability-catalog/"),
("PATCH", "/capability-catalog/{entry_id}"),
}
def test_capability_request_read_router_accepts_host_model_injection() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
router = create_capability_request_read_router(
get_session,
domain_model=Domain,
request_model=CapabilityRequest,
request_read_schema=CapabilityRequestRead,
)
method_paths = {
(method, route.path)
for route in router.routes
for method in getattr(route, "methods", set())
}
assert router.prefix == ""
assert method_paths == {
("GET", "/capability-requests/"),
("GET", "/capability-requests/{request_id}"),
}
def test_tpsc_router_accepts_host_model_injection() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
router = create_tpsc_router(
get_session,
repo_model=ManagedRepo,
catalog_model=TPSCCatalog,
snapshot_model=TPSCSnapshot,
entry_model=TPSCEntry,
)
assert router.prefix == "/tpsc"
assert router.routes
def test_progress_router_accepts_host_model_and_schema_injection() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
router = create_progress_router(
get_session,
progress_model=ProgressEvent,
progress_create_schema=ProgressEventCreate,
progress_read_schema=ProgressEventRead,
)
assert router.prefix == "/progress"
assert router.routes
def test_policy_router_can_register_update_route() -> None:
router = create_policy_router(
lambda name: None,
update_policy=lambda name, content: PolicyRead(name=name, content=content),
)
method_paths = {
(method, route.path)
for route in router.routes
for method in getattr(route, "methods", set())
}
assert ("GET", "/policy/{name}") in method_paths
assert ("PUT", "/policy/{name}") in method_paths
def test_fos10_event_contract() -> None:
assert RISK_EVENT_TYPES == {
"risk_surfaced",
"risk_mitigated",
"risk_escalated",
}
assert ALERT_EVENT_TYPES == {
"alert_raised",
"alert_acknowledged",
"alert_resolved",
}
assert FOS10_EVENT_TYPES == RISK_EVENT_TYPES | ALERT_EVENT_TYPES
def test_progress_router_registers_fos10_views() -> None:
async def get_session():
raise AssertionError("router construction should not resolve sessions")
router = create_progress_router(get_session)
paths = {route.path for route in router.routes}
assert "/progress/risks" in paths
assert "/progress/alerts" in paths