feat(ops): add ops-hub service inventory now view (CUST-WP-0047)
Seed a non-secret service inventory (environments, hosts, clusters, services, endpoints, access paths, evidence, gaps) with a JSON schema, a renderer, and a generated service-catalog view. Adds the `make ops-inventory-view` target, probe ActivityDefinition, and docs. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
216
ops/render_service_inventory.py
Normal file
216
ops/render_service_inventory.py
Normal file
@@ -0,0 +1,216 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Render the ops service inventory into a compact Markdown now view."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
from collections import Counter
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError as exc: # pragma: no cover - environment guard
|
||||
raise SystemExit("PyYAML is required to render ops/service-inventory.yml") from exc
|
||||
|
||||
|
||||
DEFAULT_INPUT = Path("ops/service-inventory.yml")
|
||||
DEFAULT_OUTPUT = Path("docs/ops-hub-service-catalog.md")
|
||||
|
||||
|
||||
def text(value: Any, default: str = "-") -> str:
|
||||
if value is None:
|
||||
return default
|
||||
if isinstance(value, str):
|
||||
return value if value else default
|
||||
return str(value)
|
||||
|
||||
|
||||
def md(value: Any) -> str:
|
||||
return text(value).replace("|", "\\|").replace("\n", "<br>")
|
||||
|
||||
|
||||
def joined(values: list[Any] | None, limit: int | None = None) -> str:
|
||||
if not values:
|
||||
return "-"
|
||||
items = [text(v) for v in values]
|
||||
if limit is not None and len(items) > limit:
|
||||
shown = items[:limit]
|
||||
shown.append(f"+{len(items) - limit} more")
|
||||
items = shown
|
||||
return "<br>".join(md(item) for item in items)
|
||||
|
||||
|
||||
def endpoint_label(endpoint: dict[str, Any]) -> str:
|
||||
label = endpoint.get("url") or endpoint.get("id") or "-"
|
||||
checks: list[str] = []
|
||||
if endpoint.get("expected_status") is not None:
|
||||
checks.append(f"status {endpoint['expected_status']}")
|
||||
if endpoint.get("expected_signal"):
|
||||
checks.append(endpoint["expected_signal"])
|
||||
if checks:
|
||||
label = f"{label}<br>Expected: {', '.join(checks)}"
|
||||
return md(label)
|
||||
|
||||
|
||||
def primary_endpoint(service: dict[str, Any]) -> str:
|
||||
endpoints = service.get("endpoints") or []
|
||||
if not endpoints:
|
||||
return "-"
|
||||
return endpoint_label(endpoints[0])
|
||||
|
||||
|
||||
def runtime_label(service: dict[str, Any], envs: dict[str, dict[str, Any]]) -> str:
|
||||
env_id = service.get("environment")
|
||||
env = envs.get(env_id, {})
|
||||
parts = [env.get("name") or env_id or "-"]
|
||||
|
||||
runtime = service.get("runtime") or {}
|
||||
details: list[str] = []
|
||||
for key in ("type", "cluster", "namespace", "host", "public_endpoint"):
|
||||
if runtime.get(key):
|
||||
details.append(f"{key}: {runtime[key]}")
|
||||
if runtime.get("ports"):
|
||||
details.append("ports: " + ", ".join(str(p) for p in runtime["ports"]))
|
||||
if details:
|
||||
parts.append("; ".join(details))
|
||||
|
||||
return "<br>".join(md(part) for part in parts)
|
||||
|
||||
|
||||
def access_label(service: dict[str, Any]) -> str:
|
||||
paths = service.get("access_paths") or []
|
||||
if not paths:
|
||||
return "-"
|
||||
labels = []
|
||||
for path in paths[:2]:
|
||||
labels.append(
|
||||
f"{path.get('type', '-')}: {path.get('status', 'unknown')} "
|
||||
f"({path.get('target', '-')})"
|
||||
)
|
||||
if len(paths) > 2:
|
||||
labels.append(f"+{len(paths) - 2} more")
|
||||
return "<br>".join(md(label) for label in labels)
|
||||
|
||||
|
||||
def latest_evidence(service: dict[str, Any]) -> str:
|
||||
evidence = service.get("evidence") or []
|
||||
if not evidence:
|
||||
return "-"
|
||||
dated = [item for item in evidence if item.get("observed_at")]
|
||||
latest = max(dated, key=lambda item: item["observed_at"]) if dated else evidence[-1]
|
||||
when = latest.get("observed_at") or "undated"
|
||||
summary = latest.get("summary") or latest.get("source") or "-"
|
||||
return md(f"{when}: {summary}")
|
||||
|
||||
|
||||
def service_table(inventory: dict[str, Any]) -> str:
|
||||
envs = {env["id"]: env for env in inventory.get("environments", [])}
|
||||
rows = [
|
||||
"| Service | Where | Owner | Endpoint | Health | Data | Access | Top Gap |",
|
||||
"|---|---|---|---|---|---|---|---|",
|
||||
]
|
||||
for service in inventory.get("services", []):
|
||||
gaps = service.get("gaps") or []
|
||||
rows.append(
|
||||
"| "
|
||||
+ " | ".join(
|
||||
[
|
||||
md(f"{service.get('name')} ({service.get('id')})"),
|
||||
runtime_label(service, envs),
|
||||
joined(service.get("owner_repos"), limit=3),
|
||||
primary_endpoint(service),
|
||||
md(f"{service.get('health_status', 'unknown')}<br>{latest_evidence(service)}"),
|
||||
joined(service.get("backing_stores"), limit=3),
|
||||
access_label(service),
|
||||
md(gaps[0] if gaps else "-"),
|
||||
]
|
||||
)
|
||||
+ " |"
|
||||
)
|
||||
return "\n".join(rows)
|
||||
|
||||
|
||||
def summary_table(inventory: dict[str, Any]) -> str:
|
||||
services = inventory.get("services", [])
|
||||
health = Counter(service.get("health_status", "unknown") for service in services)
|
||||
rows = [
|
||||
"| Metric | Count |",
|
||||
"|---|---:|",
|
||||
f"| Environments | {len(inventory.get('environments', []))} |",
|
||||
f"| Hosts | {len(inventory.get('hosts', []))} |",
|
||||
f"| Clusters | {len(inventory.get('clusters', []))} |",
|
||||
f"| Services | {len(services)} |",
|
||||
]
|
||||
for status, count in sorted(health.items()):
|
||||
rows.append(f"| Services: {md(status)} | {count} |")
|
||||
return "\n".join(rows)
|
||||
|
||||
|
||||
def gaps_section(inventory: dict[str, Any]) -> str:
|
||||
lines = ["## Open Operating Gaps", ""]
|
||||
for service in inventory.get("services", []):
|
||||
gaps = service.get("gaps") or []
|
||||
if not gaps:
|
||||
continue
|
||||
lines.append(f"### {service.get('name')} (`{service.get('id')}`)")
|
||||
lines.append("")
|
||||
for gap in gaps:
|
||||
lines.append(f"- {gap}")
|
||||
lines.append("")
|
||||
return "\n".join(lines).rstrip()
|
||||
|
||||
|
||||
def render(inventory: dict[str, Any]) -> str:
|
||||
source = "ops/service-inventory.yml"
|
||||
reviewed = inventory.get("last_reviewed", "unknown")
|
||||
lines = [
|
||||
"# Ops Hub Service Catalog Now View",
|
||||
"",
|
||||
"<!-- generated by ops/render_service_inventory.py; edit ops/service-inventory.yml instead -->",
|
||||
"",
|
||||
f"Source: `{source}`",
|
||||
f"Inventory last reviewed: `{reviewed}`",
|
||||
"",
|
||||
"This is the repo-native first view for `CUST-WP-0047`. It exists so an",
|
||||
"operator can answer what is running where before the full standalone",
|
||||
"`ops-hub` application is available.",
|
||||
"",
|
||||
"## Summary",
|
||||
"",
|
||||
summary_table(inventory),
|
||||
"",
|
||||
"## Service Catalog",
|
||||
"",
|
||||
service_table(inventory),
|
||||
"",
|
||||
gaps_section(inventory),
|
||||
"",
|
||||
"## Next Evidence Events",
|
||||
"",
|
||||
"- `ops-service-observed` for each runtime object confirmed by a probe.",
|
||||
"- `ops-endpoint-verified` for HTTP, HTTPS, tunnel, or cluster endpoints.",
|
||||
"- `ops-access-path-checked` for non-secret access path checks.",
|
||||
"- `ops-backup-verified` where backup and restore evidence exists.",
|
||||
"- `ops-inventory-drift` when observed state differs from this inventory.",
|
||||
"",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument("--input", type=Path, default=DEFAULT_INPUT)
|
||||
parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT)
|
||||
args = parser.parse_args()
|
||||
|
||||
inventory = yaml.safe_load(args.input.read_text(encoding="utf-8"))
|
||||
rendered = render(inventory)
|
||||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||||
args.output.write_text(rendered, encoding="utf-8")
|
||||
print(f"rendered {args.output} from {args.input}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user