Seed a non-secret service inventory (environments, hosts, clusters, services, endpoints, access paths, evidence, gaps) with a JSON schema, a renderer, and a generated service-catalog view. Adds the `make ops-inventory-view` target, probe ActivityDefinition, and docs. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
217 lines
7.2 KiB
Python
217 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Render the ops service inventory into a compact Markdown now view."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError as exc: # pragma: no cover - environment guard
|
|
raise SystemExit("PyYAML is required to render ops/service-inventory.yml") from exc
|
|
|
|
|
|
DEFAULT_INPUT = Path("ops/service-inventory.yml")
|
|
DEFAULT_OUTPUT = Path("docs/ops-hub-service-catalog.md")
|
|
|
|
|
|
def text(value: Any, default: str = "-") -> str:
|
|
if value is None:
|
|
return default
|
|
if isinstance(value, str):
|
|
return value if value else default
|
|
return str(value)
|
|
|
|
|
|
def md(value: Any) -> str:
|
|
return text(value).replace("|", "\\|").replace("\n", "<br>")
|
|
|
|
|
|
def joined(values: list[Any] | None, limit: int | None = None) -> str:
|
|
if not values:
|
|
return "-"
|
|
items = [text(v) for v in values]
|
|
if limit is not None and len(items) > limit:
|
|
shown = items[:limit]
|
|
shown.append(f"+{len(items) - limit} more")
|
|
items = shown
|
|
return "<br>".join(md(item) for item in items)
|
|
|
|
|
|
def endpoint_label(endpoint: dict[str, Any]) -> str:
|
|
label = endpoint.get("url") or endpoint.get("id") or "-"
|
|
checks: list[str] = []
|
|
if endpoint.get("expected_status") is not None:
|
|
checks.append(f"status {endpoint['expected_status']}")
|
|
if endpoint.get("expected_signal"):
|
|
checks.append(endpoint["expected_signal"])
|
|
if checks:
|
|
label = f"{label}<br>Expected: {', '.join(checks)}"
|
|
return md(label)
|
|
|
|
|
|
def primary_endpoint(service: dict[str, Any]) -> str:
|
|
endpoints = service.get("endpoints") or []
|
|
if not endpoints:
|
|
return "-"
|
|
return endpoint_label(endpoints[0])
|
|
|
|
|
|
def runtime_label(service: dict[str, Any], envs: dict[str, dict[str, Any]]) -> str:
|
|
env_id = service.get("environment")
|
|
env = envs.get(env_id, {})
|
|
parts = [env.get("name") or env_id or "-"]
|
|
|
|
runtime = service.get("runtime") or {}
|
|
details: list[str] = []
|
|
for key in ("type", "cluster", "namespace", "host", "public_endpoint"):
|
|
if runtime.get(key):
|
|
details.append(f"{key}: {runtime[key]}")
|
|
if runtime.get("ports"):
|
|
details.append("ports: " + ", ".join(str(p) for p in runtime["ports"]))
|
|
if details:
|
|
parts.append("; ".join(details))
|
|
|
|
return "<br>".join(md(part) for part in parts)
|
|
|
|
|
|
def access_label(service: dict[str, Any]) -> str:
|
|
paths = service.get("access_paths") or []
|
|
if not paths:
|
|
return "-"
|
|
labels = []
|
|
for path in paths[:2]:
|
|
labels.append(
|
|
f"{path.get('type', '-')}: {path.get('status', 'unknown')} "
|
|
f"({path.get('target', '-')})"
|
|
)
|
|
if len(paths) > 2:
|
|
labels.append(f"+{len(paths) - 2} more")
|
|
return "<br>".join(md(label) for label in labels)
|
|
|
|
|
|
def latest_evidence(service: dict[str, Any]) -> str:
|
|
evidence = service.get("evidence") or []
|
|
if not evidence:
|
|
return "-"
|
|
dated = [item for item in evidence if item.get("observed_at")]
|
|
latest = max(dated, key=lambda item: item["observed_at"]) if dated else evidence[-1]
|
|
when = latest.get("observed_at") or "undated"
|
|
summary = latest.get("summary") or latest.get("source") or "-"
|
|
return md(f"{when}: {summary}")
|
|
|
|
|
|
def service_table(inventory: dict[str, Any]) -> str:
|
|
envs = {env["id"]: env for env in inventory.get("environments", [])}
|
|
rows = [
|
|
"| Service | Where | Owner | Endpoint | Health | Data | Access | Top Gap |",
|
|
"|---|---|---|---|---|---|---|---|",
|
|
]
|
|
for service in inventory.get("services", []):
|
|
gaps = service.get("gaps") or []
|
|
rows.append(
|
|
"| "
|
|
+ " | ".join(
|
|
[
|
|
md(f"{service.get('name')} ({service.get('id')})"),
|
|
runtime_label(service, envs),
|
|
joined(service.get("owner_repos"), limit=3),
|
|
primary_endpoint(service),
|
|
md(f"{service.get('health_status', 'unknown')}<br>{latest_evidence(service)}"),
|
|
joined(service.get("backing_stores"), limit=3),
|
|
access_label(service),
|
|
md(gaps[0] if gaps else "-"),
|
|
]
|
|
)
|
|
+ " |"
|
|
)
|
|
return "\n".join(rows)
|
|
|
|
|
|
def summary_table(inventory: dict[str, Any]) -> str:
|
|
services = inventory.get("services", [])
|
|
health = Counter(service.get("health_status", "unknown") for service in services)
|
|
rows = [
|
|
"| Metric | Count |",
|
|
"|---|---:|",
|
|
f"| Environments | {len(inventory.get('environments', []))} |",
|
|
f"| Hosts | {len(inventory.get('hosts', []))} |",
|
|
f"| Clusters | {len(inventory.get('clusters', []))} |",
|
|
f"| Services | {len(services)} |",
|
|
]
|
|
for status, count in sorted(health.items()):
|
|
rows.append(f"| Services: {md(status)} | {count} |")
|
|
return "\n".join(rows)
|
|
|
|
|
|
def gaps_section(inventory: dict[str, Any]) -> str:
|
|
lines = ["## Open Operating Gaps", ""]
|
|
for service in inventory.get("services", []):
|
|
gaps = service.get("gaps") or []
|
|
if not gaps:
|
|
continue
|
|
lines.append(f"### {service.get('name')} (`{service.get('id')}`)")
|
|
lines.append("")
|
|
for gap in gaps:
|
|
lines.append(f"- {gap}")
|
|
lines.append("")
|
|
return "\n".join(lines).rstrip()
|
|
|
|
|
|
def render(inventory: dict[str, Any]) -> str:
|
|
source = "ops/service-inventory.yml"
|
|
reviewed = inventory.get("last_reviewed", "unknown")
|
|
lines = [
|
|
"# Ops Hub Service Catalog Now View",
|
|
"",
|
|
"<!-- generated by ops/render_service_inventory.py; edit ops/service-inventory.yml instead -->",
|
|
"",
|
|
f"Source: `{source}`",
|
|
f"Inventory last reviewed: `{reviewed}`",
|
|
"",
|
|
"This is the repo-native first view for `CUST-WP-0047`. It exists so an",
|
|
"operator can answer what is running where before the full standalone",
|
|
"`ops-hub` application is available.",
|
|
"",
|
|
"## Summary",
|
|
"",
|
|
summary_table(inventory),
|
|
"",
|
|
"## Service Catalog",
|
|
"",
|
|
service_table(inventory),
|
|
"",
|
|
gaps_section(inventory),
|
|
"",
|
|
"## Next Evidence Events",
|
|
"",
|
|
"- `ops-service-observed` for each runtime object confirmed by a probe.",
|
|
"- `ops-endpoint-verified` for HTTP, HTTPS, tunnel, or cluster endpoints.",
|
|
"- `ops-access-path-checked` for non-secret access path checks.",
|
|
"- `ops-backup-verified` where backup and restore evidence exists.",
|
|
"- `ops-inventory-drift` when observed state differs from this inventory.",
|
|
"",
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--input", type=Path, default=DEFAULT_INPUT)
|
|
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
|
|
args = parser.parse_args()
|
|
|
|
inventory = yaml.safe_load(args.input.read_text(encoding="utf-8"))
|
|
rendered = render(inventory)
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
args.output.write_text(rendered, encoding="utf-8")
|
|
print(f"rendered {args.output} from {args.input}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|