entity relationship model

This commit is contained in:
2026-05-14 15:06:17 +02:00
parent 5e6c5aed8b
commit 9627d03c1a
8 changed files with 599 additions and 6 deletions

View File

@@ -9,6 +9,7 @@ from .models import (
TopicConfig,
ViabilityThreshold,
)
from .semantics import EntityRecord, RelationRecord, list_entities, list_relations
__all__ = [
"DisciplineBinding",
@@ -19,10 +20,14 @@ __all__ = [
"InfospaceError",
"KnowledgeArtifact",
"MetricValue",
"EntityRecord",
"RelationRecord",
"ScoreEntry",
"TopicConfig",
"ViabilityThreshold",
"add_artifact",
"create_infospace",
"list_entities",
"list_relations",
"load_infospace",
]

View File

@@ -8,6 +8,7 @@ from pathlib import Path
from .errors import InfospaceError
from .lifecycle import add_artifact, create_infospace, load_infospace
from .markdown_adapter import validate_infospace_artifacts
from .semantics import list_entities, list_relations
def build_parser() -> argparse.ArgumentParser:
@@ -35,6 +36,12 @@ def build_parser() -> argparse.ArgumentParser:
validate = sub.add_parser("validate", help="Validate infospace artifacts")
validate.add_argument("root")
entities = sub.add_parser("entities", help="List parsed entity artifacts")
entities.add_argument("root")
relations = sub.add_parser("relations", help="List parsed relation artifacts")
relations.add_argument("root")
return parser
@@ -72,6 +79,23 @@ def main(argv: list[str] | None = None) -> int:
}
)
return 0 if valid else 1
elif args.command == "entities":
_write_json(
{
"entities": [
entity.to_dict() for entity in list_entities(Path(args.root))
]
}
)
elif args.command == "relations":
_write_json(
{
"relations": [
relation.to_dict()
for relation in list_relations(Path(args.root))
]
}
)
else:
parser.error(f"Unhandled command: {args.command}")
except InfospaceError as exc:

View File

@@ -15,13 +15,20 @@ CONFIG_FILE = "infospace.yaml"
ARTIFACT_INDEX = "artifacts/index.yaml"
LAYOUT_DIRS = (
"artifacts/sources",
"artifacts/entities",
"artifacts/relations",
"artifacts/generated",
"output/evaluations",
"output/metrics",
"reports",
"exports",
)
KIND_DIRS = {"source": "sources", "generated": "generated"}
KIND_DIRS = {
"source": "sources",
"entity": "entities",
"relation": "relations",
"generated": "generated",
}
def create_infospace(

View File

@@ -0,0 +1,281 @@
from __future__ import annotations
import re
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
from .errors import InfospaceError
from .lifecycle import load_infospace
from .markdown_adapter import (
ParsedMarkdownArtifact,
extract_section_text,
parse_markdown_artifact,
)
MINOR_TITLE_WORDS = {
"a",
"an",
"and",
"as",
"at",
"but",
"by",
"for",
"if",
"in",
"is",
"nor",
"of",
"on",
"or",
"so",
"the",
"to",
"up",
"yet",
}
@dataclass(frozen=True)
class EntityRecord:
artifact_id: str
slug: str
title: str
h1_raw: str
definition: str = ""
source_chapter: str = ""
context: str = ""
domain: str = ""
original_wording: str = ""
modern_interpretation: str = ""
h1_is_title_case: bool = False
has_original_wording: bool = False
definition_word_count: int = 0
total_word_count: int = 0
section_slugs: list[str] = field(default_factory=list)
source_path: str = ""
def to_dict(self) -> dict[str, Any]:
return asdict(self)
@dataclass(frozen=True)
class RelationRecord:
artifact_id: str
slug: str
subject: str
subject_slug: str
subject_entity_id: str
predicate: str
object: str
object_slug: str
object_entity_id: str
relation_type: str = ""
vsm_channel: str = ""
evidence: str = ""
feedback_role: str = ""
source_path: str = ""
@property
def is_feedback_member(self) -> bool:
return bool(self.feedback_role.strip())
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
data["is_feedback_member"] = self.is_feedback_member
return data
def parse_entity_artifact(artifact_id: str, path: str | Path) -> EntityRecord:
artifact_path = Path(path)
parsed = parse_markdown_artifact(artifact_path)
h1 = _first_heading(parsed, level=1)
missing_sections: list[str] = []
if h1 is None:
missing_sections.append("h1")
definition = _section_text(parsed, "Definition")
if not definition:
missing_sections.append("definition")
if missing_sections:
raise InfospaceError(
"invalid_entity_artifact",
f"Invalid entity artifact: {artifact_id}",
{
"artifact_id": artifact_id,
"path": str(artifact_path),
"missing_sections": missing_sections,
},
)
title = h1.text if h1 is not None else ""
original_wording = _section_text(
parsed,
"Original Wording",
"Smith's Original Wording",
)
return EntityRecord(
artifact_id=artifact_id,
slug=slugify(title),
title=title,
h1_raw=title,
definition=definition,
source_chapter=_section_text(parsed, "Source Chapter"),
context=_section_text(parsed, "Context"),
domain=_section_text(
parsed,
"Economic Domain",
"Supply Chain Domain",
"Knowledge Domain",
"Domain",
),
original_wording=original_wording,
modern_interpretation=_section_text(parsed, "Modern Interpretation"),
h1_is_title_case=_is_title_case(title),
has_original_wording=bool(original_wording),
definition_word_count=_word_count(definition),
total_word_count=_word_count(artifact_path.read_text(encoding="utf-8")),
section_slugs=_section_slugs(parsed),
source_path=str(artifact_path),
)
def parse_relation_artifact(
artifact_id: str,
path: str | Path,
entity_ids: dict[str, str] | None = None,
) -> RelationRecord:
artifact_path = Path(path)
parsed = parse_markdown_artifact(artifact_path)
h1 = _first_heading(parsed, level=1)
missing_sections: list[str] = []
if h1 is None:
missing_sections.append("h1")
subject = _section_text(parsed, "Subject")
predicate = _section_text(parsed, "Predicate")
obj = _section_text(parsed, "Object")
for section_slug, value in (
("subject", subject),
("predicate", predicate),
("object", obj),
):
if not value:
missing_sections.append(section_slug)
if missing_sections:
raise InfospaceError(
"invalid_relation_artifact",
f"Invalid relation artifact: {artifact_id}",
{
"artifact_id": artifact_id,
"path": str(artifact_path),
"missing_sections": missing_sections,
},
)
subject_slug = slugify(subject)
object_slug = slugify(obj)
subject_entity_id = ""
object_entity_id = ""
if entity_ids is not None:
missing_slugs = [
slug
for slug in (subject_slug, object_slug)
if slug and slug not in entity_ids
]
if missing_slugs:
raise InfospaceError(
"unresolved_relation_endpoint",
f"Relation endpoint not found for artifact: {artifact_id}",
{
"artifact_id": artifact_id,
"path": str(artifact_path),
"missing_slugs": missing_slugs,
},
)
subject_entity_id = entity_ids[subject_slug]
object_entity_id = entity_ids[object_slug]
title = h1.text if h1 is not None else artifact_path.stem
return RelationRecord(
artifact_id=artifact_id,
slug=slugify(title),
subject=subject,
subject_slug=subject_slug,
subject_entity_id=subject_entity_id,
predicate=predicate,
object=obj,
object_slug=object_slug,
object_entity_id=object_entity_id,
relation_type=_section_text(parsed, "Relation Type"),
vsm_channel=_section_text(parsed, "VSM Channel"),
evidence=_section_text(parsed, "Evidence"),
feedback_role=_section_text(parsed, "Feedback Role"),
source_path=str(artifact_path),
)
def list_entities(root: str | Path) -> list[EntityRecord]:
infospace = load_infospace(root)
return [
parse_entity_artifact(artifact.id, infospace.root / artifact.path)
for artifact in infospace.artifacts
if artifact.kind == "entity"
]
def list_relations(root: str | Path) -> list[RelationRecord]:
infospace = load_infospace(root)
entity_ids = {entity.slug: entity.artifact_id for entity in list_entities(root)}
return [
parse_relation_artifact(artifact.id, infospace.root / artifact.path, entity_ids)
for artifact in infospace.artifacts
if artifact.kind == "relation"
]
def slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.strip().lower())
return slug.strip("-")
def _first_heading(parsed: ParsedMarkdownArtifact, *, level: int) -> Any | None:
return next((heading for heading in parsed.headings if heading.level == level), None)
def _section_text(parsed: ParsedMarkdownArtifact, *headings: str) -> str:
for heading in headings:
text = extract_section_text(parsed, heading)
if text:
return text
return ""
def _section_slugs(parsed: ParsedMarkdownArtifact) -> list[str]:
return [
slugify(section.heading.text)
for section in parsed.sections
if section.heading.level == 2
]
def _is_title_case(value: str) -> bool:
words = value.split()
if not words:
return False
for index, word in enumerate(words):
clean = re.sub(r"[^\w]", "", word)
if not clean:
continue
if index > 0 and clean.lower() in MINOR_TITLE_WORDS:
continue
if not clean[0].isupper():
return False
return True
def _word_count(value: str) -> int:
return len(value.split())