Add validation indexes and generated views

This commit is contained in:
2026-05-23 03:32:16 +02:00
parent dc44208c9f
commit c112bf5c74
37 changed files with 2007 additions and 8 deletions

View File

@@ -3,21 +3,33 @@
from .service import (
CanonServiceError,
artifact_graph,
generate_agent_briefs,
generate_indexes,
generate_tree,
inspect_canon,
list_artifacts,
list_models,
list_standards,
list_views,
profile_inspect,
read_view,
validate_canon,
write_validation_report,
)
__all__ = [
"CanonServiceError",
"artifact_graph",
"generate_agent_briefs",
"generate_indexes",
"generate_tree",
"inspect_canon",
"list_artifacts",
"list_models",
"list_standards",
"list_views",
"profile_inspect",
"read_view",
"validate_canon",
"write_validation_report",
]

View File

@@ -14,7 +14,9 @@ from .service import (
list_artifacts,
list_models,
list_standards,
list_views,
profile_inspect,
read_view,
validate_canon,
)
@@ -86,6 +88,11 @@ def _route(
if path == "/graph":
graph_format = _first(query, "format") or "json"
return HTTPStatus.OK, artifact_graph(root, output_format=graph_format)
if path == "/views":
return HTTPStatus.OK, list_views(root)
if path.startswith("/views/"):
name = path.removeprefix("/views/").strip("/")
return HTTPStatus.OK, read_view(name, root)
if path.startswith("/profiles/") and path.endswith("/inspect"):
profile = path.removeprefix("/profiles/").removesuffix("/inspect").strip("/")
return HTTPStatus.OK, profile_inspect(profile, root)

View File

@@ -11,12 +11,18 @@ from .api import serve
from .service import (
CanonServiceError,
artifact_graph,
generate_agent_briefs,
generate_indexes,
generate_tree,
inspect_canon,
list_artifacts,
list_models,
list_standards,
list_views,
profile_inspect,
read_view,
validate_canon,
write_validation_report,
)
@@ -46,8 +52,26 @@ def build_parser() -> argparse.ArgumentParser:
standards.set_defaults(handler=_standards)
validate = sub.add_parser("validate", help="Validate the canon infospace")
validate.add_argument(
"--write",
default="",
help="Write the JSON validation payload to this path.",
)
validate.set_defaults(handler=_validate)
index = sub.add_parser("index", help="Refresh generated indexes and views")
index.set_defaults(handler=_index)
tree = sub.add_parser("tree", help="Refresh the generated infospace tree")
tree.set_defaults(handler=_tree)
agent_briefs = sub.add_parser("agent-briefs", help="Refresh generated agent briefs")
agent_briefs.set_defaults(handler=_agent_briefs)
views = sub.add_parser("views", help="List or read generated views")
views.add_argument("name", nargs="?", default="")
views.set_defaults(handler=_views)
graph = sub.add_parser("graph", help="Export the canon artifact graph")
graph.add_argument("--format", choices=["json", "mermaid"], default="json")
graph.set_defaults(handler=_graph)
@@ -113,9 +137,29 @@ def _standards(args: argparse.Namespace) -> dict[str, Any]:
def _validate(args: argparse.Namespace) -> dict[str, Any]:
if args.write:
return write_validation_report(args.write, _root(args))
return validate_canon(_root(args))
def _index(args: argparse.Namespace) -> dict[str, Any]:
return generate_indexes(_root(args))
def _tree(args: argparse.Namespace) -> dict[str, Any]:
return generate_tree(_root(args))
def _agent_briefs(args: argparse.Namespace) -> dict[str, Any]:
return generate_agent_briefs(_root(args))
def _views(args: argparse.Namespace) -> dict[str, Any]:
if args.name:
return read_view(args.name, _root(args))
return list_views(_root(args))
def _graph(args: argparse.Namespace) -> dict[str, Any]:
return artifact_graph(_root(args), output_format=args.format)

View File

@@ -0,0 +1,427 @@
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from typing import Any
import yaml
GENERATED_NOTICE = "<!-- GENERATED by info_tech_canon; do not edit by hand. -->"
def generate_indexes(context: Any) -> dict[str, Any]:
assets: list[dict[str, Any]] = []
ownership = concept_ownership(context)
import_matrix = relationship_matrix(context)
assets.append(
_write_yaml(
context.infospace_root / "indexes" / "concept-ownership.yaml",
ownership,
)
)
assets.append(
_write_yaml(
context.infospace_root / "indexes" / "import-matrix.yaml",
import_matrix,
)
)
assets.append(
_write_yaml(
context.infospace_root / "indexes" / "artifact-tree.yaml",
artifact_tree(context),
)
)
assets.extend(generate_views(context, ownership, import_matrix)["files"])
return _result("index", assets)
def generate_views(
context: Any,
ownership: dict[str, Any] | None = None,
import_matrix: dict[str, Any] | None = None,
) -> dict[str, Any]:
ownership = ownership or concept_ownership(context)
import_matrix = import_matrix or relationship_matrix(context)
files = [
_write_text(
context.infospace_root / "views" / "by-standard.md",
_render_by_standard(context),
),
_write_text(
context.infospace_root / "views" / "by-concept.md",
_render_by_concept(ownership),
),
_write_text(
context.infospace_root / "views" / "by-profile.md",
_render_by_profile(context),
),
_write_text(
context.infospace_root / "views" / "by-mapping-target.md",
_render_by_mapping_target(context),
),
_write_text(
context.infospace_root / "views" / "kernel-overview.md",
_render_kernel_overview(context),
),
_write_text(
context.infospace_root / "views" / "import-matrix.md",
_render_import_matrix(import_matrix),
),
]
return _result("views", files)
def generate_tree(context: Any) -> dict[str, Any]:
tree = artifact_tree(context)
files = [
_write_yaml(context.infospace_root / "indexes" / "artifact-tree.yaml", tree),
_write_text(
context.infospace_root / "views" / "repository-tree.md",
_render_repository_tree(tree),
),
]
return _result("tree", files)
def generate_agent_briefs(context: Any) -> dict[str, Any]:
files = [
_write_text(
context.infospace_root / "agent" / "global-agent-brief.md",
_render_global_agent_brief(context),
)
]
return _result("agent-briefs", files)
def list_generated_views(context: Any) -> dict[str, Any]:
views = []
for path in sorted((context.infospace_root / "views").glob("*.md")):
views.append(
{
"name": path.name,
"path": str(path.relative_to(context.infospace_root)),
"generated": _is_generated(path),
}
)
return {"ok": True, "count": len(views), "views": views}
def read_generated_view(context: Any, name: str) -> dict[str, Any]:
if "/" in name or "\\" in name:
raise ValueError("View name must be a single file name.")
path = context.infospace_root / "views" / name
if not path.is_file():
raise FileNotFoundError(name)
return {
"ok": True,
"name": name,
"path": str(path.relative_to(context.infospace_root)),
"generated": _is_generated(path),
"content": path.read_text(encoding="utf-8"),
}
def concept_ownership(context: Any) -> dict[str, Any]:
concepts: list[dict[str, Any]] = []
for artifact in sorted(context.infospace.artifacts, key=lambda item: item.id):
concepts.append(
{
"concept": artifact.title,
"owner": artifact.id,
"path": artifact.path,
"source": "artifact_title",
}
)
frontmatter = _frontmatter(context.infospace_root / artifact.path)
owned = frontmatter.get("owned_concepts") or []
if isinstance(owned, list):
for concept in owned:
concepts.append(
{
"concept": str(concept),
"owner": artifact.id,
"path": artifact.path,
"source": "frontmatter.owned_concepts",
}
)
by_key: dict[str, list[dict[str, Any]]] = defaultdict(list)
for item in concepts:
by_key[_normalize_concept(item["concept"])].append(item)
duplicates = [
{"normalized": key, "candidates": items}
for key, items in sorted(by_key.items())
if len(items) > 1
]
conflicts = [
{
"normalized": item["normalized"],
"owners": sorted({candidate["owner"] for candidate in item["candidates"]}),
"candidates": item["candidates"],
}
for item in duplicates
if len({candidate["owner"] for candidate in item["candidates"]}) > 1
]
return {
"concept_count": len(concepts),
"concepts": concepts,
"duplicate_candidates": duplicates,
"ownership_conflicts": conflicts,
}
def relationship_matrix(context: Any) -> dict[str, Any]:
artifact_ids = sorted(artifact.id for artifact in context.infospace.artifacts)
rows: list[dict[str, Any]] = []
for artifact in sorted(context.infospace.artifacts, key=lambda item: item.id):
targets: dict[str, list[str]] = {target: [] for target in artifact_ids}
for relationship in artifact.relationships:
target = relationship.get("target")
relation_type = str(relationship.get("type") or "related")
if isinstance(target, str) and target in targets:
targets[target].append(relation_type)
rows.append(
{
"artifact": artifact.id,
"targets": {
target: sorted(types)
for target, types in targets.items()
if types
},
}
)
return {"artifacts": artifact_ids, "rows": rows}
def artifact_tree(context: Any) -> dict[str, Any]:
files: list[dict[str, Any]] = []
for path in sorted(context.infospace_root.rglob("*")):
if path.is_file():
relative = path.relative_to(context.infospace_root)
files.append(
{
"path": str(relative),
"directory": str(relative.parent),
"name": path.name,
}
)
return {"root": "infospace", "file_count": len(files), "files": files}
def _render_by_standard(context: Any) -> str:
lines = _heading("By Standard")
standards = [
artifact
for artifact in context.infospace.artifacts
if artifact.kind in {"kernel", "standard"}
]
for artifact in sorted(standards, key=lambda item: item.id):
lines.extend(
[
f"## {artifact.title}",
"",
f"- ID: `{artifact.id}`",
f"- Kind: `{artifact.kind}`",
f"- Path: `{artifact.path}`",
f"- Relationships: {len(artifact.relationships)}",
"",
]
)
return "\n".join(lines).rstrip() + "\n"
def _render_by_concept(ownership: dict[str, Any]) -> str:
lines = _heading("By Concept")
lines.extend(
[
f"Concept count: **{ownership['concept_count']}**",
"",
"| Concept | Owner | Source |",
"| --- | --- | --- |",
]
)
for concept in ownership["concepts"]:
lines.append(
f"| {concept['concept']} | `{concept['owner']}` | `{concept['source']}` |"
)
lines.extend(["", "## Duplicate Candidates", ""])
duplicates = ownership["duplicate_candidates"]
if not duplicates:
lines.append("No duplicate concept candidates detected.")
else:
for duplicate in duplicates:
lines.append(f"- `{duplicate['normalized']}`")
lines.extend(["", "## Ownership Conflicts", ""])
conflicts = ownership["ownership_conflicts"]
if not conflicts:
lines.append("No ownership conflicts detected.")
else:
for conflict in conflicts:
owners = ", ".join(f"`{owner}`" for owner in conflict["owners"])
lines.append(f"- `{conflict['normalized']}` owned by {owners}")
return "\n".join(lines).rstrip() + "\n"
def _render_by_profile(context: Any) -> str:
lines = _heading("By Profile")
profiles = sorted((context.infospace_root / "profiles").glob("*/profile.yaml"))
if not profiles:
lines.append("No profiles have been registered yet.")
for path in profiles:
lines.extend(
[
f"## {path.parent.name}",
"",
f"- Path: `{path.relative_to(context.infospace_root)}`",
"",
]
)
return "\n".join(lines).rstrip() + "\n"
def _render_by_mapping_target(context: Any) -> str:
incoming: dict[str, list[tuple[str, str]]] = defaultdict(list)
for artifact in context.infospace.artifacts:
for relationship in artifact.relationships:
target = relationship.get("target")
relation_type = str(relationship.get("type") or "related")
if isinstance(target, str):
incoming[target].append((artifact.id, relation_type))
lines = _heading("By Mapping Target")
for target in sorted(incoming):
lines.extend([f"## `{target}`", ""])
for source, relation_type in sorted(incoming[target]):
lines.append(f"- `{source}` via `{relation_type}`")
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _render_kernel_overview(context: Any) -> str:
kind_counts: dict[str, int] = defaultdict(int)
relationship_counts: dict[str, int] = defaultdict(int)
for artifact in context.infospace.artifacts:
kind_counts[artifact.kind] += 1
for relationship in artifact.relationships:
relationship_counts[str(relationship.get("type") or "related")] += 1
lines = _heading("Kernel Overview")
lines.extend(
[
f"- Infospace: `{context.infospace.config.slug}`",
f"- Artifacts: {len(context.infospace.artifacts)}",
"",
"## Artifact Kinds",
"",
]
)
for kind, count in sorted(kind_counts.items()):
lines.append(f"- `{kind}`: {count}")
lines.extend(["", "## Relationship Types", ""])
for relation_type, count in sorted(relationship_counts.items()):
lines.append(f"- `{relation_type}`: {count}")
return "\n".join(lines).rstrip() + "\n"
def _render_import_matrix(matrix: dict[str, Any]) -> str:
artifacts = matrix["artifacts"]
lines = _heading("Import Matrix")
header = "| Artifact | " + " | ".join(f"`{artifact}`" for artifact in artifacts) + " |"
divider = "| --- | " + " | ".join("---" for _ in artifacts) + " |"
lines.extend([header, divider])
for row in matrix["rows"]:
cells = []
targets = row["targets"]
for artifact in artifacts:
cells.append(", ".join(f"`{item}`" for item in targets.get(artifact, [])))
lines.append(f"| `{row['artifact']}` | " + " | ".join(cells) + " |")
return "\n".join(lines).rstrip() + "\n"
def _render_repository_tree(tree: dict[str, Any]) -> str:
lines = _heading("Repository Tree")
lines.append(f"File count: **{tree['file_count']}**")
lines.append("")
for file_info in tree["files"]:
lines.append(f"- `{file_info['path']}`")
return "\n".join(lines).rstrip() + "\n"
def _render_global_agent_brief(context: Any) -> str:
lines = _heading("Global Agent Brief")
lines.extend(
[
"This brief summarizes the current canon service surface for agents.",
"",
f"- Infospace slug: `{context.infospace.config.slug}`",
f"- Artifact count: {len(context.infospace.artifacts)}",
"- Primary confidence command: `make validate`",
"- Refresh generated indexes and views with: `make index`",
"",
"## Useful Commands",
"",
"- `PYTHONPATH=src python3 -m info_tech_canon inspect`",
"- `PYTHONPATH=src python3 -m info_tech_canon validate`",
"- `PYTHONPATH=src python3 -m info_tech_canon graph`",
"- `PYTHONPATH=src python3 -m info_tech_canon index`",
"",
"## Consumption Notes",
"",
"- Treat `seeds/` as provenance.",
"- Treat `infospace/` as the service-consumable canon root.",
"- Generated files are marked and can be refreshed deterministically.",
]
)
return "\n".join(lines).rstrip() + "\n"
def _heading(title: str) -> list[str]:
return [GENERATED_NOTICE, "", f"# {title}", ""]
def _write_text(path: Path, content: str) -> dict[str, Any]:
path.parent.mkdir(parents=True, exist_ok=True)
old = path.read_text(encoding="utf-8") if path.exists() else None
changed = old != content
if changed:
path.write_text(content, encoding="utf-8")
return {"path": str(path), "changed": changed}
def _write_yaml(path: Path, data: dict[str, Any]) -> dict[str, Any]:
path.parent.mkdir(parents=True, exist_ok=True)
content = yaml.safe_dump(data, sort_keys=False)
return _write_text(path, content)
def _result(kind: str, files: list[dict[str, Any]]) -> dict[str, Any]:
return {
"ok": True,
"kind": kind,
"count": len(files),
"changed": [item for item in files if item["changed"]],
"files": files,
}
def _frontmatter(path: Path) -> dict[str, Any]:
text = path.read_text(encoding="utf-8")
if not text.startswith("---\n"):
return {}
end = text.find("\n---\n", 4)
if end == -1:
return {}
data = yaml.safe_load(text[4:end]) or {}
return data if isinstance(data, dict) else {}
def _normalize_concept(value: str) -> str:
return "-".join(value.lower().replace("_", "-").split())
def _is_generated(path: Path) -> bool:
try:
return path.read_text(encoding="utf-8").startswith(GENERATED_NOTICE)
except FileNotFoundError:
return False

View File

@@ -3,10 +3,12 @@ from __future__ import annotations
from collections import Counter
from dataclasses import asdict, dataclass
from pathlib import Path
import json
from typing import Any
import yaml
from . import generation
from .bench import (
Infospace,
KnowledgeArtifact,
@@ -15,6 +17,7 @@ from .bench import (
relationship_summary,
run_collection_checks,
)
from .validation import structural_checks
REPO_ROOT = Path(__file__).resolve().parents[2]
@@ -121,6 +124,7 @@ def list_standards(root: Path | str | None = None) -> dict[str, Any]:
def validate_canon(root: Path | str | None = None) -> dict[str, Any]:
context = load_context(root)
errors: list[dict[str, Any]] = []
warnings: list[dict[str, Any]] = []
artifact_ids = {artifact.id for artifact in context.infospace.artifacts}
for artifact in context.infospace.artifacts:
@@ -161,15 +165,31 @@ def validate_canon(root: Path | str | None = None) -> dict[str, Any]:
context.infospace.config.viability,
)
errors.extend(threshold_errors)
structural = structural_checks(context)
errors.extend(structural["errors"])
warnings.extend(structural["warnings"])
return {
"ok": not errors,
"errors": errors,
"warnings": warnings,
"metrics": checks.metrics,
"details": checks.details,
}
def write_validation_report(
destination: Path | str,
root: Path | str | None = None,
) -> dict[str, Any]:
payload = validate_canon(root)
path = Path(destination)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
payload["report_path"] = str(path)
return payload
def artifact_graph(
root: Path | str | None = None,
*,
@@ -221,6 +241,39 @@ def profile_inspect(
return {"ok": True, "profile": data, "path": str(profile_path)}
def generate_indexes(root: Path | str | None = None) -> dict[str, Any]:
return generation.generate_indexes(load_context(root))
def generate_tree(root: Path | str | None = None) -> dict[str, Any]:
return generation.generate_tree(load_context(root))
def generate_agent_briefs(root: Path | str | None = None) -> dict[str, Any]:
return generation.generate_agent_briefs(load_context(root))
def list_views(root: Path | str | None = None) -> dict[str, Any]:
return generation.list_generated_views(load_context(root))
def read_view(name: str, root: Path | str | None = None) -> dict[str, Any]:
try:
return generation.read_generated_view(load_context(root), name)
except FileNotFoundError as exc:
raise CanonServiceError(
"missing_view",
f"View not found: {name}",
{"view": name},
) from exc
except ValueError as exc:
raise CanonServiceError(
"invalid_view_name",
str(exc),
{"view": name},
) from exc
def _artifact_to_dict(
artifact: KnowledgeArtifact,
infospace_root: Path,

View File

@@ -0,0 +1,360 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
import yaml
REQUIRED_TOP_LEVEL_FILES = (
"README.md",
"INTENT.md",
"SCOPE.md",
"canon.yaml",
"pyproject.toml",
"workplans/index.yaml",
"infospace/infospace.yaml",
"infospace/artifacts/index.yaml",
)
REQUIRED_INFOSPACE_DIRS = (
"kernel",
"models",
"standards",
"profiles",
"patterns",
"mappings",
"assimilation",
"schemas",
"views",
"agent",
"examples",
"validation",
"indexes",
)
OPTIONAL_COLLECTION_DIRS = (
"profiles",
"patterns",
"mappings",
"assimilation",
"examples",
)
REQUIRED_SCHEMAS = (
"standard.schema.yaml",
"concept.schema.yaml",
"mapping.schema.yaml",
"profile.schema.yaml",
"assimilation.schema.yaml",
"interface-card.schema.yaml",
"agent-brief.schema.yaml",
"workplan.schema.yaml",
)
def structural_checks(context: Any) -> dict[str, list[dict[str, Any]]]:
errors: list[dict[str, Any]] = []
warnings: list[dict[str, Any]] = []
_check_required_top_level_files(context.repo_root, errors)
_check_required_infospace_dirs(context.infospace_root, errors)
_check_required_schemas(context.infospace_root, errors)
_check_canon_paths(context.repo_root, context.infospace_root, errors)
_check_artifact_index(context.repo_root, context.infospace_root, errors)
_check_optional_assets(context.infospace_root, warnings)
return {"errors": errors, "warnings": warnings}
def _check_required_top_level_files(
repo_root: Path,
errors: list[dict[str, Any]],
) -> None:
for relative in REQUIRED_TOP_LEVEL_FILES:
if not (repo_root / relative).is_file():
errors.append(
{
"code": "missing_required_file",
"path": relative,
}
)
def _check_required_infospace_dirs(
infospace_root: Path,
errors: list[dict[str, Any]],
) -> None:
for relative in REQUIRED_INFOSPACE_DIRS:
if not (infospace_root / relative).is_dir():
errors.append(
{
"code": "missing_required_infospace_dir",
"path": str(Path("infospace") / relative),
}
)
def _check_required_schemas(
infospace_root: Path,
errors: list[dict[str, Any]],
) -> None:
for filename in REQUIRED_SCHEMAS:
if not (infospace_root / "schemas" / filename).is_file():
errors.append(
{
"code": "missing_schema",
"path": str(Path("infospace") / "schemas" / filename),
}
)
def _check_canon_paths(
repo_root: Path,
infospace_root: Path,
errors: list[dict[str, Any]],
) -> None:
canon_path = repo_root / "canon.yaml"
canon = _read_yaml(canon_path, errors)
if not isinstance(canon, dict):
return
indexed_paths = _artifact_paths_by_path(infospace_root, errors)
for section in ("kernel", "models", "standards"):
items = canon.get(section) or []
if not isinstance(items, list):
errors.append(
{
"code": "invalid_canon_section",
"section": section,
"message": "Expected a list.",
}
)
continue
for item in items:
if not isinstance(item, dict):
errors.append(
{
"code": "invalid_canon_entry",
"section": section,
"message": "Expected a mapping.",
}
)
continue
path = str(item.get("path") or "")
if not path:
errors.append(
{
"code": "missing_canon_path",
"section": section,
"id": item.get("id"),
}
)
continue
if not (repo_root / path).is_file():
errors.append(
{
"code": "missing_canon_path_target",
"section": section,
"id": item.get("id"),
"path": path,
}
)
relative_infospace_path = _strip_infospace_prefix(path)
if relative_infospace_path not in indexed_paths:
errors.append(
{
"code": "canon_path_not_indexed",
"section": section,
"id": item.get("id"),
"path": path,
}
)
def _check_artifact_index(
repo_root: Path,
infospace_root: Path,
errors: list[dict[str, Any]],
) -> None:
index_path = infospace_root / "artifacts" / "index.yaml"
index = _read_yaml(index_path, errors)
if not isinstance(index, dict):
return
artifacts = index.get("artifacts")
if not isinstance(artifacts, list):
errors.append(
{
"code": "invalid_artifact_index",
"path": "infospace/artifacts/index.yaml",
"message": "Expected artifacts list.",
}
)
return
ids: set[str] = set()
for artifact in artifacts:
if not isinstance(artifact, dict):
errors.append(
{
"code": "invalid_artifact_entry",
"message": "Expected artifact mapping.",
}
)
continue
artifact_id = str(artifact.get("id") or "")
if not artifact_id:
errors.append({"code": "missing_artifact_id"})
elif artifact_id in ids:
errors.append(
{
"code": "duplicate_artifact_id",
"artifact_id": artifact_id,
}
)
ids.add(artifact_id)
for field in ("path", "kind", "title"):
if not artifact.get(field):
errors.append(
{
"code": "missing_artifact_field",
"artifact_id": artifact_id,
"field": field,
}
)
relative_path = str(artifact.get("path") or "")
if relative_path and not (infospace_root / relative_path).is_file():
errors.append(
{
"code": "missing_artifact_path",
"artifact_id": artifact_id,
"path": relative_path,
}
)
provenance = artifact.get("provenance") or {}
if isinstance(provenance, dict):
source_path = provenance.get("source_path")
if isinstance(source_path, str) and source_path:
if not (repo_root / source_path).is_file():
errors.append(
{
"code": "missing_provenance_source",
"artifact_id": artifact_id,
"source_path": source_path,
}
)
for artifact in artifacts:
if not isinstance(artifact, dict):
continue
artifact_id = str(artifact.get("id") or "")
relationships = artifact.get("relationships") or []
if not isinstance(relationships, list):
errors.append(
{
"code": "invalid_relationships",
"artifact_id": artifact_id,
"message": "Expected relationship list.",
}
)
continue
for relationship in relationships:
if not isinstance(relationship, dict):
errors.append(
{
"code": "invalid_relationship",
"artifact_id": artifact_id,
}
)
continue
target = relationship.get("target")
if target not in ids:
errors.append(
{
"code": "missing_relationship_target",
"artifact_id": artifact_id,
"target": target,
}
)
def _check_optional_assets(
infospace_root: Path,
warnings: list[dict[str, Any]],
) -> None:
global_brief = infospace_root / "agent" / "global-agent-brief.md"
if not global_brief.is_file():
warnings.append(
{
"code": "missing_optional_agent_brief",
"path": "infospace/agent/global-agent-brief.md",
}
)
concepts_dir = infospace_root / "concepts"
if not concepts_dir.is_dir():
warnings.append(
{
"code": "missing_optional_concepts_dir",
"path": "infospace/concepts",
}
)
for relative in OPTIONAL_COLLECTION_DIRS:
directory = infospace_root / relative
if directory.is_dir() and not _has_substantive_files(directory):
warnings.append(
{
"code": "empty_optional_collection",
"path": str(Path("infospace") / relative),
}
)
def _artifact_paths_by_path(
infospace_root: Path,
errors: list[dict[str, Any]],
) -> set[str]:
index = _read_yaml(infospace_root / "artifacts" / "index.yaml", errors)
if not isinstance(index, dict):
return set()
artifacts = index.get("artifacts") or []
if not isinstance(artifacts, list):
return set()
return {
str(artifact.get("path"))
for artifact in artifacts
if isinstance(artifact, dict) and artifact.get("path")
}
def _read_yaml(path: Path, errors: list[dict[str, Any]]) -> Any:
try:
with path.open("r", encoding="utf-8") as handle:
return yaml.safe_load(handle) or {}
except FileNotFoundError:
errors.append({"code": "missing_yaml", "path": str(path)})
except yaml.YAMLError as exc:
errors.append(
{
"code": "invalid_yaml",
"path": str(path),
"message": str(exc),
}
)
return None
def _strip_infospace_prefix(path: str) -> str:
prefix = "infospace/"
return path[len(prefix) :] if path.startswith(prefix) else path
def _has_substantive_files(directory: Path) -> bool:
for path in directory.rglob("*"):
if path.is_file() and path.name != "README.md":
return True
return False