state-hub scope functionality work

This commit is contained in:
2026-05-01 01:33:15 +02:00
parent 45fb6e141d
commit fc725ec65f
4 changed files with 345 additions and 11 deletions

View File

@@ -8,22 +8,42 @@ from __future__ import annotations
import asyncio
import json
import re
import socket
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Literal
from typing import Any, Literal
import yaml
CriterionStatus = Literal["pass", "fail", "warn", "skip"]
Tier = Literal["none", "core", "standard", "full"]
# Criteria that belong to each tier (in check order)
CORE_IDS = {"C1", "C2", "C3", "C4"}
STANDARD_IDS = {"C5", "C6", "C7", "C8", "C9"}
STANDARD_IDS = {"C5a", "C5b", "C5c", "C6", "C7", "C8", "C9"}
FULL_IDS = {"C10", "C11", "C12", "C13", "C14"}
STANDARD_SCOPE_SECTIONS = [
"One-liner",
"Core Idea",
"In Scope",
"Out of Scope",
"Relevant When",
"Not Relevant When",
"Current State",
"How It Fits",
"Terminology",
"Related / Overlapping",
"Provided Capabilities",
]
_CAPABILITY_BLOCK_RE = re.compile(r"```capability\s*\n(.*?)```", re.DOTALL | re.IGNORECASE)
_H2_RE = re.compile(r"^##\s+(.+?)\s*$", re.MULTILINE)
@dataclass
class CriterionResult:
@@ -45,6 +65,154 @@ class DoIReport:
checked_at: str = field(default_factory=lambda: datetime.now(tz=timezone.utc).isoformat())
def evaluate_scope_health(repo: dict) -> list[dict[str, Any]]:
"""Return machine-readable SCOPE.md health issues for C5a/C5b/C5c.
The returned records intentionally mirror DoI criterion IDs while carrying
section-level hints that downstream repo-scoping can use to refresh only
the affected parts of SCOPE.md.
"""
repo_path = _resolve_path(repo)
if not repo_path:
return [
{
"id": "C5a",
"label": "SCOPE.md present",
"status": "skip",
"detail": "Local path unavailable",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
},
{
"id": "C5b",
"label": "SCOPE.md standard sections",
"status": "skip",
"detail": "Local path unavailable",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
},
{
"id": "C5c",
"label": "SCOPE.md capability blocks",
"status": "skip",
"detail": "Local path unavailable",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
},
]
scope_path = Path(repo_path) / "SCOPE.md"
if not scope_path.exists():
return [
{
"id": "C5a",
"label": "SCOPE.md present",
"status": "fail",
"detail": "SCOPE.md not found at repo root",
"missing_sections": STANDARD_SCOPE_SECTIONS.copy(),
"invalid_capability_blocks": [],
"needs_refresh_sections": STANDARD_SCOPE_SECTIONS.copy(),
},
{
"id": "C5b",
"label": "SCOPE.md standard sections",
"status": "skip",
"detail": "SCOPE.md absent",
"missing_sections": STANDARD_SCOPE_SECTIONS.copy(),
"invalid_capability_blocks": [],
"needs_refresh_sections": STANDARD_SCOPE_SECTIONS.copy(),
},
{
"id": "C5c",
"label": "SCOPE.md capability blocks",
"status": "skip",
"detail": "SCOPE.md absent",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": ["Provided Capabilities"],
},
]
text = scope_path.read_text()
issues: list[dict[str, Any]] = [{
"id": "C5a",
"label": "SCOPE.md present",
"status": "pass",
"detail": "",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
}]
headings = {h.strip() for h in _H2_RE.findall(text)}
missing_sections = [section for section in STANDARD_SCOPE_SECTIONS if section not in headings]
if missing_sections:
issues.append({
"id": "C5b",
"label": "SCOPE.md standard sections",
"status": "warn",
"detail": f"Missing H2 section(s): {', '.join(missing_sections)}",
"missing_sections": missing_sections,
"invalid_capability_blocks": [],
"needs_refresh_sections": missing_sections,
})
else:
issues.append({
"id": "C5b",
"label": "SCOPE.md standard sections",
"status": "pass",
"detail": f"All {len(STANDARD_SCOPE_SECTIONS)} standard sections present",
"missing_sections": [],
"invalid_capability_blocks": [],
"needs_refresh_sections": [],
})
capability_blocks = _CAPABILITY_BLOCK_RE.findall(text)
valid_blocks = 0
invalid_blocks: list[dict[str, Any]] = []
for index, block in enumerate(capability_blocks, start=1):
try:
parsed = yaml.safe_load(block) or {}
if isinstance(parsed, dict) and parsed.get("type") and parsed.get("title"):
valid_blocks += 1
else:
invalid_blocks.append({
"index": index,
"reason": "Capability block must be YAML with type and title",
})
except yaml.YAMLError as exc:
invalid_blocks.append({"index": index, "reason": str(exc)})
if valid_blocks > 0:
issues.append({
"id": "C5c",
"label": "SCOPE.md capability blocks",
"status": "pass",
"detail": f"{valid_blocks} valid capability block(s)",
"missing_sections": [],
"invalid_capability_blocks": invalid_blocks,
"needs_refresh_sections": [],
})
else:
detail = "No fenced capability block found"
if invalid_blocks:
detail = "No valid capability block found"
issues.append({
"id": "C5c",
"label": "SCOPE.md capability blocks",
"status": "warn",
"detail": detail,
"missing_sections": [],
"invalid_capability_blocks": invalid_blocks,
"needs_refresh_sections": ["Provided Capabilities"],
})
return issues
def compute_fingerprint(
repo: dict,
latest_tpsc_snap_at: str | None,
@@ -205,13 +373,9 @@ async def evaluate(
# ── Tier 2: Standard ─────────────────────────────────────────────────────
# C5: SCOPE.md
if not repo_path:
_r("C5", "SCOPE.md present", "standard", "skip", "Local path unavailable")
elif (Path(repo_path) / "SCOPE.md").exists():
_r("C5", "SCOPE.md present", "standard", "pass")
else:
_r("C5", "SCOPE.md present", "standard", "fail", "SCOPE.md not found at repo root")
# C5a/C5b/C5c: SCOPE.md structure and capability declarations
for issue in evaluate_scope_health(repo):
_r(issue["id"], issue["label"], "standard", issue["status"], issue["detail"])
# C6: CLAUDE.md
if not repo_path: