maturity scorecard generation

This commit is contained in:
2026-05-08 01:59:42 +02:00
parent b4f620533c
commit 3a94042ca3
14 changed files with 1385 additions and 95 deletions

View File

@@ -0,0 +1,247 @@
"""Normalization helpers for Apache Chemistry OpenCMIS TCK output."""
from __future__ import annotations
import re
from collections import Counter
from typing import Any
OPENCMIS_STATUSES = {
"INFO",
"SKIPPED",
"OK",
"WARNING",
"FAILURE",
"UNEXPECTED_EXCEPTION",
}
_STATUS_PATTERN = re.compile(
r"^(?P<indent>\s*)(?P<status>INFO|SKIPPED|OK|WARNING|FAILURE|UNEXPECTED_EXCEPTION):\s*(?P<message>.*)$"
)
_TEST_HEADER_PATTERN = re.compile(r"^(?P<name>.+?)\s+\((?P<duration>\d+)\s+ms\)$")
_PROGRESS_TEST_PATTERN = re.compile(
r"^\s{2}(?P<name>.+?)\s+\((?P<duration>\d+)ms\):\s+"
r"(?P<status>INFO|SKIPPED|OK|WARNING|FAILURE|UNEXPECTED_EXCEPTION)\s*$"
)
_PROGRESS_GROUP_PATTERN = re.compile(r"^(?P<name>.+?)\s+\((?P<count>\d+)\s+tests\)$")
_SOURCE_LOCATION_PATTERN = re.compile(
r"\s+\((?P<file>[A-Za-z0-9_.$-]+\.java):(?P<line>\d+)\)$"
)
def parse_text_report(
output: str,
selected_group: str | None = None,
group_classes: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Parse the native OpenCMIS TextReport/ConsoleRunner text output."""
lines = output.splitlines()
cases = _parse_text_report_cases(lines, selected_group, group_classes or [])
if cases:
return cases
return _parse_progress_cases(lines, selected_group, group_classes or [])
def result_counts(cases: list[dict[str, Any]]) -> dict[str, int]:
counts = Counter(str(case.get("status", "unknown")) for case in cases)
return dict(sorted(counts.items()))
def aggregate_case_result(counts: dict[str, int], returncode: int) -> str:
if counts.get("infrastructure_error"):
return "infrastructure_error"
if counts.get("fail"):
return "fail"
if counts.get("warning"):
return "warning"
if counts.get("pass") or counts.get("info"):
return "pass"
if counts.get("expected_gap"):
return "expected_gap"
if counts.get("unsupported_by_design"):
return "unsupported_by_design"
if counts.get("skipped"):
return "skipped"
if counts.get("manual"):
return "manual"
if counts.get("not_applicable"):
return "not_applicable"
if counts.get("blocked"):
return "blocked"
return "infrastructure_error" if returncode else "unknown"
def normalize_case_status(value: str) -> str:
normalized = value.strip().lower().replace("-", "_").replace(" ", "_")
if normalized == "info":
return "info"
if normalized in {"ok", "success", "passed"}:
return "pass"
if normalized in {"failure", "failed", "error"}:
return "fail"
if normalized in {"unexpected_exception", "infra", "infrastructure_error"}:
return "infrastructure_error"
if normalized in {"skip", "skipped"}:
return "skipped"
if normalized in {"expected_skip", "expected_gap"}:
return "expected_gap"
if normalized in {"unsupported", "unsupported_by_design"}:
return "unsupported_by_design"
if normalized in {
"pass",
"fail",
"warning",
"manual",
"not_applicable",
"waiver_applied",
"blocked",
"unknown",
}:
return normalized
return "unknown"
def _parse_text_report_cases(
lines: list[str],
selected_group: str | None,
group_classes: list[str],
) -> list[dict[str, Any]]:
cases: list[dict[str, Any]] = []
current_group: str | None = None
current_test: str | None = None
current_duration_ms: int | None = None
i = 0
while i < len(lines):
line = lines[i]
stripped = line.strip()
if _is_separator(stripped, "=") and i + 2 < len(lines):
candidate = lines[i + 1].strip()
if candidate and _is_separator(lines[i + 2].strip(), "="):
current_group = candidate
current_test = None
current_duration_ms = None
i += 3
continue
if _is_separator(stripped, "-") and i + 2 < len(lines):
match = _TEST_HEADER_PATTERN.match(lines[i + 1].strip())
if match and _is_separator(lines[i + 2].strip(), "-"):
current_test = match.group("name")
current_duration_ms = int(match.group("duration"))
i += 3
continue
match = _STATUS_PATTERN.match(line)
if match:
case = _case_from_match(
match,
len(cases) + 1,
selected_group,
group_classes,
current_group,
current_test,
current_duration_ms,
)
cases.append(case)
i += 1
return cases
def _parse_progress_cases(
lines: list[str],
selected_group: str | None,
group_classes: list[str],
) -> list[dict[str, Any]]:
cases: list[dict[str, Any]] = []
current_group: str | None = None
for line in lines:
group_match = _PROGRESS_GROUP_PATTERN.match(line.strip())
if group_match:
current_group = group_match.group("name")
continue
test_match = _PROGRESS_TEST_PATTERN.match(line)
if not test_match:
continue
native_status = test_match.group("status")
test_name = test_match.group("name")
case_id = _case_id(selected_group, current_group, test_name, len(cases) + 1)
cases.append(
{
"id": case_id,
"status": normalize_case_status(native_status),
"status_native": native_status,
"message": f"{test_name} completed with {native_status}.",
"group_name": current_group,
"selected_check_group": selected_group,
"test_name": test_name,
"duration_ms": int(test_match.group("duration")),
"level": 0,
"group_classes": group_classes,
"source": "opencmis-console-progress",
}
)
return cases
def _case_from_match(
match: re.Match[str],
index: int,
selected_group: str | None,
group_classes: list[str],
current_group: str | None,
current_test: str | None,
current_duration_ms: int | None,
) -> dict[str, Any]:
native_status = match.group("status")
message = match.group("message").strip()
source_location = _source_location(message)
if source_location is not None:
message = _SOURCE_LOCATION_PATTERN.sub("", message).rstrip()
case_id = _case_id(selected_group, current_group, current_test, index)
return {
"id": case_id,
"status": normalize_case_status(native_status),
"status_native": native_status,
"message": message,
"group_name": current_group,
"selected_check_group": selected_group,
"test_name": current_test,
"duration_ms": current_duration_ms,
"level": len(match.group("indent")) // 2,
"group_classes": group_classes,
"source_location": source_location,
"source": "opencmis-text-report",
}
def _source_location(message: str) -> dict[str, Any] | None:
match = _SOURCE_LOCATION_PATTERN.search(message)
if not match:
return None
return {
"file": match.group("file"),
"line": int(match.group("line")),
}
def _case_id(
selected_group: str | None,
current_group: str | None,
current_test: str | None,
index: int,
) -> str:
group = _safe_id(selected_group or current_group or "opencmis")
test = _safe_id(current_test or "case")
return f"opencmis-tck:{group}:{test}:{index:04d}"
def _safe_id(value: str) -> str:
lowered = value.strip().lower()
safe = "".join(char if char.isalnum() else "-" for char in lowered)
safe = "-".join(part for part in safe.split("-") if part)
return safe or "unknown"
def _is_separator(value: str, char: str) -> bool:
return len(value) >= 20 and set(value) == {char}

View File

@@ -0,0 +1,333 @@
"""CMIS capability maturity scorecard generation."""
from __future__ import annotations
import json
from collections import Counter
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
CAPABILITY_GROUPS = [
{
"id": "repository-type",
"label": "Repository And Type Metadata",
"weight": 2.0,
"description": "Repository identity, repository information, and type metadata.",
},
{
"id": "object-content",
"label": "Object And Content Services",
"weight": 2.0,
"description": "Object services, properties, content streams, and lifecycle operations.",
},
{
"id": "navigation",
"label": "Navigation Services",
"weight": 1.5,
"description": "Folder tree, children, descendants, and filing behavior.",
},
{
"id": "query",
"label": "Query",
"weight": 1.0,
"description": "Query support and query-result behavior.",
},
{
"id": "relationships",
"label": "Relationships",
"weight": 0.75,
"description": "Relationship object and relationship navigation behavior.",
},
{
"id": "acl-policy",
"label": "ACL And Policy",
"weight": 1.0,
"description": "ACL and policy support where claimed by the target.",
},
{
"id": "versioning",
"label": "Versioning",
"weight": 1.0,
"description": "Checkout, checkin, version series, and version-specific behavior.",
},
{
"id": "change-log",
"label": "Change Log",
"weight": 0.75,
"description": "Change token and change event behavior.",
},
{
"id": "extension-gaps",
"label": "Extensions And Known Gaps",
"weight": 0.5,
"description": "Explicitly scoped extensions, unsupported optional services, and gaps.",
},
]
def build_scorecard(run_dir: Path) -> dict[str, Any]:
run_metadata = _load_json(run_dir / "run.json")
evidence = _load_json(run_dir / "normalized" / "evidence.json").get("evidence", [])
mappings = _load_json(run_dir / "normalized" / "mappings.json").get("mappings", [])
findings = _load_json(run_dir / "normalized" / "findings.json").get("findings", [])
assessment_package = _load_json(run_dir / "reports" / "assessment-package.json")
evidence_by_id = {item["id"]: item for item in evidence}
findings_by_check = _findings_by_check(findings)
mapping_groups = _mappings_by_group(mappings)
target_known_gap_refs = _known_gap_refs(assessment_package)
groups = [
_score_group(group, mapping_groups.get(group["id"], []), evidence_by_id, findings_by_check, target_known_gap_refs)
for group in CAPABILITY_GROUPS
]
assessed_groups = [group for group in groups if group["status"] != "not_assessed"]
max_weighted_score = sum(group["weight"] * 4 for group in groups)
weighted_score = sum(group["weighted_score"] for group in groups)
maturity_score = round((weighted_score / max_weighted_score) * 100, 2) if max_weighted_score else 0.0
return {
"id": f"cmis-maturity-scorecard:{run_metadata['id']}",
"run_id": run_metadata["id"],
"target_profile_ref": run_metadata["target_profile_ref"],
"assessment_profile_ref": run_metadata["assessment_profile_ref"],
"created_at": _now(),
"summary": {
"maturity_score": maturity_score,
"maturity_level": _overall_level(maturity_score),
"assessed_groups": len(assessed_groups),
"total_groups": len(groups),
"coverage_percent": round((len(assessed_groups) / len(groups)) * 100, 2),
"groups_with_failures": sum(1 for group in groups if group["status"] == "failing"),
"groups_blocked": sum(1 for group in groups if group["status"] == "blocked"),
"groups_with_expected_gaps": sum(1 for group in groups if group["status"] == "scoped_gap"),
},
"groups": groups,
"certification_boundary": "This scorecard interprets guide-board preparation evidence only and does not certify CMIS conformance.",
}
def write_scorecard(run_dir: Path, output_dir: Path | None = None) -> dict[str, str]:
output = output_dir or run_dir / "reports"
output.mkdir(parents=True, exist_ok=True)
scorecard = build_scorecard(run_dir)
json_path = output / "cmis-maturity-scorecard.json"
markdown_path = output / "cmis-maturity-scorecard.md"
json_path.write_text(json.dumps(scorecard, indent=2, sort_keys=True) + "\n", encoding="utf-8")
markdown_path.write_text(markdown_scorecard(scorecard), encoding="utf-8")
return {
"status": "written",
"json": str(json_path),
"markdown": str(markdown_path),
}
def markdown_scorecard(scorecard: dict[str, Any]) -> str:
summary = scorecard["summary"]
lines = [
f"# CMIS Capability Maturity Scorecard: {scorecard['run_id']}",
"",
f"Target: {scorecard['target_profile_ref']}",
f"Assessment: {scorecard['assessment_profile_ref']}",
f"Maturity score: {summary['maturity_score']} ({summary['maturity_level']})",
f"Coverage: {summary['assessed_groups']}/{summary['total_groups']} groups ({summary['coverage_percent']}%)",
"",
"## Capability Groups",
"",
]
for group in scorecard["groups"]:
lines.extend(
[
f"### {group['label']}",
"",
f"- status: {group['status']}",
f"- maturity level: {group['maturity_level']}",
f"- score: {group['score']}/4",
f"- evidence results: {_format_counts(group['evidence_results'])}",
f"- requirements: {', '.join(group['requirement_refs']) or 'none'}",
f"- interpretation: {group['interpretation']}",
"",
]
)
lines.extend(["## Boundary", "", scorecard["certification_boundary"], ""])
return "\n".join(lines)
def _score_group(
group: dict[str, Any],
mappings: list[dict[str, Any]],
evidence_by_id: dict[str, dict[str, Any]],
findings_by_check: dict[str, list[dict[str, Any]]],
target_known_gap_refs: set[str],
) -> dict[str, Any]:
results = Counter(mapping["result"] for mapping in mappings)
requirement_refs = sorted({mapping["requirement_ref"] for mapping in mappings})
evidence_refs = sorted({mapping["evidence_id"] for mapping in mappings})
check_ids = sorted({mapping["check_id"] for mapping in mappings})
findings = [
finding
for check_id in check_ids
for finding in findings_by_check.get(check_id, [])
]
score, status, level, interpretation = _interpret_group(
results,
requirement_refs,
target_known_gap_refs,
findings,
)
return {
"id": group["id"],
"label": group["label"],
"description": group["description"],
"weight": group["weight"],
"status": status,
"maturity_level": level,
"score": score,
"weighted_score": round(score * group["weight"], 3),
"evidence_results": dict(sorted(results.items())),
"requirement_refs": requirement_refs,
"evidence_refs": evidence_refs,
"check_ids": check_ids,
"finding_refs": sorted({finding["id"] for finding in findings}),
"artifact_refs": sorted(
{
artifact_ref
for evidence_ref in evidence_refs
for artifact_ref in evidence_by_id.get(evidence_ref, {}).get("artifact_refs", [])
}
),
"interpretation": interpretation,
}
def _interpret_group(
results: Counter[str],
requirement_refs: list[str],
known_gap_refs: set[str],
findings: list[dict[str, Any]],
) -> tuple[int, str, str, str]:
if not results:
return (
0,
"not_assessed",
"not_assessed",
"No mapped evidence has been produced for this capability group yet.",
)
unexpected_findings = [finding for finding in findings if not finding.get("expected")]
if results.get("infrastructure_error"):
return (
1,
"blocked",
"infrastructure_blocked",
"The capability group could not be assessed because the test infrastructure or target endpoint failed.",
)
if results.get("blocked"):
return (
1,
"blocked",
"blocked",
"The capability group is blocked by prerequisite, preflight, dependency, or invocation setup.",
)
if unexpected_findings or results.get("fail"):
return (
1,
"failing",
"fails_claimed_capability",
"One or more mapped checks failed unexpectedly for this capability group.",
)
if results.get("warning"):
return (
3,
"partial",
"partially_demonstrated",
"The capability group produced warnings and needs review before it can be treated as stable.",
)
if results.get("expected_gap") or results.get("unsupported_by_design"):
expected_refs = sorted(set(requirement_refs).intersection(known_gap_refs))
detail = (
" Known gap refs: " + ", ".join(expected_refs) + "."
if expected_refs
else ""
)
return (
2,
"scoped_gap",
"scoped_or_unsupported",
"The capability group is explicitly scoped as unsupported or partially supported." + detail,
)
if results.get("manual") or results.get("skipped") or results.get("not_applicable"):
return (
2,
"not_automated",
"evidence_incomplete",
"Evidence exists, but the capability group was not executed as an automated pass/fail check.",
)
if results.get("pass"):
return (
4,
"demonstrated",
"demonstrated",
"Mapped checks passed for this capability group.",
)
return (
0,
"unknown",
"unknown",
"Evidence exists, but its result vocabulary was not recognized by the scorecard.",
)
def _mappings_by_group(mappings: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
groups: dict[str, list[dict[str, Any]]] = {}
for mapping in mappings:
if mapping.get("target_type") != "capability_group":
continue
groups.setdefault(mapping["target_id"], []).append(mapping)
return groups
def _findings_by_check(findings: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
by_check: dict[str, list[dict[str, Any]]] = {}
for finding in findings:
by_check.setdefault(finding["check_id"], []).append(finding)
return by_check
def _known_gap_refs(assessment_package: dict[str, Any]) -> set[str]:
target = assessment_package.get("target", {})
refs = set()
for gap in target.get("known_gaps", []):
refs.update(gap.get("requirement_refs", []))
return refs
def _overall_level(score: float) -> str:
if score >= 85:
return "strong"
if score >= 65:
return "developing"
if score >= 35:
return "limited"
if score > 0:
return "initial"
return "not_assessed"
def _format_counts(counts: dict[str, int]) -> str:
if not counts:
return "none"
return ", ".join(f"{key}: {value}" for key, value in sorted(counts.items()))
def _load_json(path: Path) -> dict[str, Any]:
with path.open("r", encoding="utf-8") as handle:
value = json.load(handle)
if not isinstance(value, dict):
raise ValueError(f"{path} must contain a JSON object")
return value
def _now() -> str:
return datetime.now(timezone.utc).isoformat()