#!/usr/bin/env python3 """repo-scoping fact ingestion connector (ATLAS-WP-0003-T02). Consume repo-scoping observed facts/evidence as connector input and emit candidate configuration surfaces, adding only config-kind/layer classification on top (ecosystem-boundaries ยง2.4 option a). Read-only: zero writes to repo-scoping or the scanned repo. Source resolution (first available): 1. --facts : a repo-scoping facts export (list of fact objects) 2. REPO_SCOPING_URL env : GET {url}/repos/{slug}/facts Degrades gracefully (emits nothing) when no source is available. Usage: python3 tools/connector_reposcoping.py [--facts facts.json] make connect-reposcoping REPO=state-hub """ from __future__ import annotations import json import os import sys from pathlib import Path from connector_base import run_connector CONFIG_HINTS = ("config", "env", "settings", "values", ".yaml", ".yml", ".toml", ".ini") def _load_facts(slug: str, facts_file: str | None) -> list[dict]: if facts_file: p = Path(facts_file) if p.exists(): data = json.loads(p.read_text()) return data if isinstance(data, list) else data.get("facts", []) print(f"repo-scoping: facts file not found: {facts_file}", file=sys.stderr) return [] url = os.environ.get("REPO_SCOPING_URL") if url: try: import urllib.request with urllib.request.urlopen(f"{url}/repos/{slug}/facts", timeout=5) as r: data = json.loads(r.read()) return data if isinstance(data, list) else data.get("facts", []) except Exception as exc: # noqa: BLE001 print(f"repo-scoping: API unavailable ({exc})", file=sys.stderr) return [] print("repo-scoping: no --facts file and REPO_SCOPING_URL unset; nothing to ingest") return [] def _is_config_fact(fact: dict) -> bool: blob = (str(fact.get("path", "")) + " " + str(fact.get("kind", "")) + " " + str(fact.get("summary", ""))).lower() return any(h in blob for h in CONFIG_HINTS) def facts_to_candidates(slug: str, facts: list[dict]) -> list[tuple[dict, str]]: out: list[tuple[dict, str]] = [] for fact in facts: if not _is_config_fact(fact): continue rel = str(fact.get("path", "")).strip("/") if not rel: continue stem = rel.replace("/", "-").replace(".", "-").replace("_", "-").lower() sid = f"surface.infotech.{slug}.{stem}" entry = { "id": sid, "name": f"{slug}: {rel}", "kind": "app-config", "summary": fact.get("summary") or f"Config surface from repo-scoping fact at {rel}.", "owner": slug, "scope": {"allowed_layers": ["company", "environment", "installation"], "default_layer": "company"}, "mutability": "deploy-time", "security_class": "operational", "sources": [{"repo": slug, "path": rel, "role": "company-baseline"}], "evidence": {"discovery_method": "connector:repo-scoping", "change_log_ref": str(fact.get("id", ""))}, } out.append((entry, f"Ingested from repo-scoping fact `{fact.get('id','?')}` " f"({rel}). Classify kind/scope and promote or reject.\n")) return out def main(argv: list[str]) -> int: if not argv: print(__doc__) return 2 slug = argv[0] facts_file = None if "--facts" in argv: i = argv.index("--facts") facts_file = argv[i + 1] if i + 1 < len(argv) else None return run_connector("repo-scoping", facts_to_candidates(slug, _load_facts(slug, facts_file))) if __name__ == "__main__": sys.path.insert(0, str(Path(__file__).resolve().parent)) raise SystemExit(main(sys.argv[1:]))