Architecture core for Knowledge Assets

This commit is contained in:
2026-05-05 21:30:39 +02:00
parent f6f3116ae7
commit d7e38606d2
14 changed files with 1113 additions and 0 deletions

View File

@@ -0,0 +1,51 @@
"""Domain core primitives for the knowledge operations architecture."""
from .actors import Actor, ActorType, OperationContext
from .assets import AssetRepresentation, KnowledgeAsset, RepresentationKind
from .audit import AuditEvent, AuditOutcome
from .metadata import Classification, LifecycleState, MetadataRecord, Sensitivity
from .policy import PolicyDecision, PolicyEffect
from .primitives import content_digest, mapping_digest, new_id, stable_json_dumps, utc_now
from .provenance import (
AssetVersion,
DerivedArtifactLineage,
SourceReference,
VersionChangeType,
)
from .relationships import (
ContextEntity,
ContextEntityType,
CoreRelationship,
RelationshipTargetKind,
)
__all__ = [
"Actor",
"ActorType",
"AssetRepresentation",
"AssetVersion",
"AuditEvent",
"AuditOutcome",
"Classification",
"ContextEntity",
"ContextEntityType",
"CoreRelationship",
"DerivedArtifactLineage",
"KnowledgeAsset",
"LifecycleState",
"MetadataRecord",
"OperationContext",
"PolicyDecision",
"PolicyEffect",
"RelationshipTargetKind",
"RepresentationKind",
"Sensitivity",
"SourceReference",
"VersionChangeType",
"content_digest",
"mapping_digest",
"new_id",
"stable_json_dumps",
"utc_now",
]

View File

@@ -0,0 +1,113 @@
"""Actor and operation-context primitives."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .primitives import compact_dict, new_id
class ActorType(str, Enum):
HUMAN = "human"
APPLICATION = "application"
AUTOMATION = "automation"
SERVICE_ACCOUNT = "service_account"
AI_AGENT = "ai_agent"
@dataclass(frozen=True)
class Actor:
id: str
actor_type: ActorType
display_name: str | None = None
external_ref: str | None = None
groups: tuple[str, ...] = ()
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def create(
cls,
actor_type: ActorType | str,
*,
actor_id: str | None = None,
display_name: str | None = None,
external_ref: str | None = None,
groups: list[str] | tuple[str, ...] | None = None,
metadata: dict[str, Any] | None = None,
) -> "Actor":
return cls(
id=actor_id or new_id("actor"),
actor_type=ActorType(actor_type),
display_name=display_name,
external_ref=external_ref,
groups=tuple(groups or ()),
metadata=dict(metadata or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.id,
"actor_type": self.actor_type.value,
"display_name": self.display_name,
"external_ref": self.external_ref,
"groups": list(self.groups),
"metadata": dict(self.metadata),
}
)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Actor":
return cls(
id=data["id"],
actor_type=ActorType(data["actor_type"]),
display_name=data.get("display_name"),
external_ref=data.get("external_ref"),
groups=tuple(data.get("groups", [])),
metadata=dict(data.get("metadata", {})),
)
@dataclass(frozen=True)
class OperationContext:
actor: Actor
correlation_id: str
delegated_actor: Actor | None = None
request_scope: dict[str, Any] = field(default_factory=dict)
policy_scope: dict[str, Any] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
@classmethod
def create(
cls,
actor: Actor,
*,
correlation_id: str | None = None,
delegated_actor: Actor | None = None,
request_scope: dict[str, Any] | None = None,
policy_scope: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
) -> "OperationContext":
return cls(
actor=actor,
delegated_actor=delegated_actor,
correlation_id=correlation_id or new_id("corr"),
request_scope=dict(request_scope or {}),
policy_scope=dict(policy_scope or {}),
metadata=dict(metadata or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"actor": self.actor.to_dict(),
"delegated_actor": self.delegated_actor.to_dict() if self.delegated_actor else None,
"correlation_id": self.correlation_id,
"request_scope": dict(self.request_scope),
"policy_scope": dict(self.policy_scope),
"metadata": dict(self.metadata),
}
)

View File

@@ -0,0 +1,147 @@
"""Canonical knowledge asset and representation models."""
from __future__ import annotations
from dataclasses import dataclass, field, replace
from enum import Enum
from typing import Any
from .metadata import Classification, LifecycleState
from .primitives import compact_dict, content_digest, new_id, utc_now
from .provenance import SourceReference
class RepresentationKind(str, Enum):
SOURCE = "source"
NORMALIZED = "normalized"
DERIVED = "derived"
@dataclass(frozen=True)
class AssetRepresentation:
asset_id: str
kind: RepresentationKind
media_type: str
digest: str
size_bytes: int
storage_ref: str | None = None
producer: str | None = None
source_ref_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
representation_id: str = field(default_factory=lambda: new_id("repr"))
created_at: str = field(default_factory=lambda: utc_now().isoformat())
@classmethod
def from_content(
cls,
asset_id: str,
kind: RepresentationKind | str,
media_type: str,
content: str | bytes,
*,
storage_ref: str | None = None,
producer: str | None = None,
source_ref_id: str | None = None,
metadata: dict[str, Any] | None = None,
representation_id: str | None = None,
) -> "AssetRepresentation":
data = content.encode("utf-8") if isinstance(content, str) else content
return cls(
representation_id=representation_id or new_id("repr"),
asset_id=asset_id,
kind=RepresentationKind(kind),
media_type=media_type,
digest=content_digest(data),
size_bytes=len(data),
storage_ref=storage_ref,
producer=producer,
source_ref_id=source_ref_id,
metadata=dict(metadata or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"representation_id": self.representation_id,
"asset_id": self.asset_id,
"kind": self.kind.value,
"media_type": self.media_type,
"digest": self.digest,
"size_bytes": self.size_bytes,
"storage_ref": self.storage_ref,
"producer": self.producer,
"source_ref_id": self.source_ref_id,
"metadata": dict(self.metadata),
"created_at": self.created_at,
}
)
@dataclass(frozen=True)
class KnowledgeAsset:
id: str
title: str
classification: Classification
source_refs: tuple[SourceReference, ...] = ()
aliases: tuple[str, ...] = ()
current_version_id: str | None = None
lifecycle: LifecycleState = LifecycleState.ACTIVE
metadata: dict[str, Any] = field(default_factory=dict)
created_at: str = field(default_factory=lambda: utc_now().isoformat())
updated_at: str = field(default_factory=lambda: utc_now().isoformat())
@classmethod
def create(
cls,
title: str,
classification: Classification,
*,
asset_id: str | None = None,
source_refs: list[SourceReference] | tuple[SourceReference, ...] | None = None,
aliases: list[str] | tuple[str, ...] | None = None,
metadata: dict[str, Any] | None = None,
) -> "KnowledgeAsset":
return cls(
id=asset_id or new_id("asset"),
title=title,
classification=classification,
source_refs=tuple(source_refs or ()),
aliases=tuple(aliases or ()),
metadata=dict(metadata or {}),
lifecycle=classification.lifecycle,
)
def with_source_reference(self, source_ref: SourceReference) -> "KnowledgeAsset":
return replace(self, source_refs=self.source_refs + (source_ref,), updated_at=utc_now().isoformat())
def with_alias(self, alias: str) -> "KnowledgeAsset":
if alias in self.aliases:
return self
return replace(self, aliases=self.aliases + (alias,), updated_at=utc_now().isoformat())
def transition_lifecycle(self, lifecycle: LifecycleState | str) -> "KnowledgeAsset":
lifecycle_state = LifecycleState(lifecycle)
classification = replace(self.classification, lifecycle=lifecycle_state)
return replace(
self,
lifecycle=lifecycle_state,
classification=classification,
updated_at=utc_now().isoformat(),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.id,
"title": self.title,
"classification": self.classification.to_dict(),
"source_refs": [source_ref.to_dict() for source_ref in self.source_refs],
"aliases": list(self.aliases),
"current_version_id": self.current_version_id,
"lifecycle": self.lifecycle.value,
"metadata": dict(self.metadata),
"created_at": self.created_at,
"updated_at": self.updated_at,
}
)

View File

@@ -0,0 +1,72 @@
"""Audit records for material engine operations."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .actors import OperationContext
from .policy import PolicyDecision
from .primitives import compact_dict, new_id, utc_now
class AuditOutcome(str, Enum):
SUCCESS = "success"
DENIED = "denied"
FAILED = "failed"
PARTIAL = "partial"
REVIEW_REQUIRED = "review_required"
DRY_RUN = "dry_run"
@dataclass(frozen=True)
class AuditEvent:
operation: str
target: str
outcome: AuditOutcome
actor_id: str
correlation_id: str
event_id: str = field(default_factory=lambda: new_id("audit"))
policy_decision: PolicyDecision | None = None
details: dict[str, Any] = field(default_factory=dict)
occurred_at: str = field(default_factory=lambda: utc_now().isoformat())
@classmethod
def from_context(
cls,
operation: str,
target: str,
outcome: AuditOutcome | str,
context: OperationContext,
*,
policy_decision: PolicyDecision | None = None,
details: dict[str, Any] | None = None,
event_id: str | None = None,
) -> "AuditEvent":
return cls(
event_id=event_id or new_id("audit"),
operation=operation,
target=target,
outcome=AuditOutcome(outcome),
actor_id=context.actor.id,
correlation_id=context.correlation_id,
policy_decision=policy_decision,
details=dict(details or {}),
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"event_id": self.event_id,
"operation": self.operation,
"target": self.target,
"outcome": self.outcome.value,
"actor_id": self.actor_id,
"correlation_id": self.correlation_id,
"policy_decision": self.policy_decision.to_dict() if self.policy_decision else None,
"details": dict(self.details),
"occurred_at": self.occurred_at,
}
)

View File

@@ -0,0 +1,74 @@
"""Metadata, classification, and lifecycle primitives."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .primitives import compact_dict, new_id, utc_now
class LifecycleState(str, Enum):
DRAFT = "draft"
ACTIVE = "active"
QUARANTINED = "quarantined"
RETIRED = "retired"
DELETE_REQUESTED = "delete_requested"
DELETED = "deleted"
class Sensitivity(str, Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass(frozen=True)
class Classification:
asset_type: str
sensitivity: Sensitivity = Sensitivity.INTERNAL
lifecycle: LifecycleState = LifecycleState.ACTIVE
topics: tuple[str, ...] = ()
owner: str | None = None
review_state: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"asset_type": self.asset_type,
"sensitivity": self.sensitivity.value,
"lifecycle": self.lifecycle.value,
"topics": list(self.topics),
"owner": self.owner,
"review_state": self.review_state,
"metadata": dict(self.metadata),
}
)
@dataclass(frozen=True)
class MetadataRecord:
key: str
value: Any
provenance: dict[str, Any] = field(default_factory=dict)
confidence: float | None = None
confirmed: bool = False
record_id: str = field(default_factory=lambda: new_id("meta"))
created_at: str = field(default_factory=lambda: utc_now().isoformat())
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"record_id": self.record_id,
"key": self.key,
"value": self.value,
"provenance": dict(self.provenance),
"confidence": self.confidence,
"confirmed": self.confirmed,
"created_at": self.created_at,
}
)

View File

@@ -0,0 +1,79 @@
"""Policy decision primitives for permission-aware operations."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .primitives import compact_dict, new_id, utc_now
class PolicyEffect(str, Enum):
ALLOW = "allow"
DENY = "deny"
REDACT = "redact"
REQUIRE_REVIEW = "require_review"
DRY_RUN_ONLY = "dry_run_only"
FAIL_CLOSED = "fail_closed"
@dataclass(frozen=True)
class PolicyDecision:
effect: PolicyEffect
subject_id: str
action: str
resource: str
reason: str = ""
decision_id: str = field(default_factory=lambda: new_id("policy"))
obligations: dict[str, Any] = field(default_factory=dict)
context: dict[str, Any] = field(default_factory=dict)
decided_at: str = field(default_factory=lambda: utc_now().isoformat())
@classmethod
def allow(cls, subject_id: str, action: str, resource: str, **kwargs: Any) -> "PolicyDecision":
return cls(PolicyEffect.ALLOW, subject_id, action, resource, **kwargs)
@classmethod
def deny(
cls,
subject_id: str,
action: str,
resource: str,
*,
reason: str,
**kwargs: Any,
) -> "PolicyDecision":
return cls(PolicyEffect.DENY, subject_id, action, resource, reason=reason, **kwargs)
@classmethod
def fail_closed(
cls,
subject_id: str,
action: str,
resource: str,
*,
reason: str = "Permission context is missing or ambiguous",
**kwargs: Any,
) -> "PolicyDecision":
return cls(PolicyEffect.FAIL_CLOSED, subject_id, action, resource, reason=reason, **kwargs)
@property
def allowed(self) -> bool:
return self.effect == PolicyEffect.ALLOW
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"decision_id": self.decision_id,
"effect": self.effect.value,
"subject_id": self.subject_id,
"action": self.action,
"resource": self.resource,
"reason": self.reason,
"obligations": dict(self.obligations),
"context": dict(self.context),
"decided_at": self.decided_at,
}
)

View File

@@ -0,0 +1,43 @@
"""Small deterministic primitives used by the domain core."""
from __future__ import annotations
import hashlib
import json
import uuid
from datetime import datetime, timezone
from typing import Any
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def new_id(prefix: str) -> str:
return f"{prefix}_{uuid.uuid4().hex}"
def stable_json_dumps(value: Any) -> str:
return json.dumps(value, sort_keys=True, separators=(",", ":"), default=str)
def content_digest(content: str | bytes) -> str:
data = content.encode("utf-8") if isinstance(content, str) else content
return "sha256:" + hashlib.sha256(data).hexdigest()
def mapping_digest(value: Any) -> str:
return content_digest(stable_json_dumps(value))
def compact_dict(data: dict[str, Any]) -> dict[str, Any]:
return {key: value for key, value in data.items() if value not in (None, {}, [])}
def datetime_to_str(value: datetime | None) -> str | None:
return value.isoformat() if value else None
def datetime_from_str(value: str | None) -> datetime | None:
return datetime.fromisoformat(value) if value else None

View File

@@ -0,0 +1,132 @@
"""Source, version, and lineage primitives."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .primitives import compact_dict, mapping_digest, new_id, utc_now
@dataclass(frozen=True)
class SourceReference:
source_system: str
path: str | None = None
uri: str | None = None
external_id: str | None = None
checksum: str | None = None
connector_ref: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
id: str = field(default_factory=lambda: new_id("src"))
@property
def identity_key(self) -> str:
return mapping_digest(
{
"source_system": self.source_system,
"path": self.path,
"uri": self.uri,
"external_id": self.external_id,
"checksum": self.checksum,
"connector_ref": self.connector_ref,
}
)
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"id": self.id,
"source_system": self.source_system,
"path": self.path,
"uri": self.uri,
"external_id": self.external_id,
"checksum": self.checksum,
"connector_ref": self.connector_ref,
"identity_key": self.identity_key,
"metadata": dict(self.metadata),
}
)
class VersionChangeType(str, Enum):
CREATED = "created"
CONTENT_CHANGED = "content_changed"
METADATA_CHANGED = "metadata_changed"
RELATIONSHIP_CHANGED = "relationship_changed"
LIFECYCLE_CHANGED = "lifecycle_changed"
DERIVED_OUTPUT = "derived_output"
RESTORED = "restored"
SUPERSEDED = "superseded"
@dataclass(frozen=True)
class AssetVersion:
asset_id: str
sequence: int
change_type: VersionChangeType
representation_ids: tuple[str, ...] = ()
actor_id: str | None = None
operation_id: str | None = None
parent_version_id: str | None = None
metadata_delta: dict[str, Any] = field(default_factory=dict)
relationship_delta: dict[str, Any] = field(default_factory=dict)
lifecycle: str | None = None
version_id: str = field(default_factory=lambda: new_id("ver"))
created_at: str = field(default_factory=lambda: utc_now().isoformat())
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"version_id": self.version_id,
"asset_id": self.asset_id,
"sequence": self.sequence,
"change_type": self.change_type.value,
"representation_ids": list(self.representation_ids),
"actor_id": self.actor_id,
"operation_id": self.operation_id,
"parent_version_id": self.parent_version_id,
"metadata_delta": dict(self.metadata_delta),
"relationship_delta": dict(self.relationship_delta),
"lifecycle": self.lifecycle,
"created_at": self.created_at,
}
)
@dataclass(frozen=True)
class DerivedArtifactLineage:
source_asset_ids: tuple[str, ...]
source_version_ids: tuple[str, ...]
transformation_run_id: str
output_asset_id: str
output_representation_id: str
actor_id: str
parameters: dict[str, Any] = field(default_factory=dict)
policy_context: dict[str, Any] = field(default_factory=dict)
adapter_provenance: dict[str, Any] = field(default_factory=dict)
lineage_id: str = field(default_factory=lambda: new_id("lineage"))
@property
def lineage_hash(self) -> str:
return mapping_digest(self.to_dict(include_hash=False))
def to_dict(self, *, include_hash: bool = True) -> dict[str, Any]:
data = compact_dict(
{
"lineage_id": self.lineage_id,
"source_asset_ids": list(self.source_asset_ids),
"source_version_ids": list(self.source_version_ids),
"transformation_run_id": self.transformation_run_id,
"output_asset_id": self.output_asset_id,
"output_representation_id": self.output_representation_id,
"actor_id": self.actor_id,
"parameters": dict(self.parameters),
"policy_context": dict(self.policy_context),
"adapter_provenance": dict(self.adapter_provenance),
}
)
if include_hash:
data["lineage_hash"] = self.lineage_hash
return data

View File

@@ -0,0 +1,82 @@
"""Context entity and typed relationship primitives."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from .primitives import compact_dict, new_id, utc_now
class ContextEntityType(str, Enum):
PERSON = "person"
TEAM = "team"
PROJECT = "project"
CASE = "case"
CUSTOMER = "customer"
PRODUCT = "product"
PROCESS = "process"
SOURCE_SYSTEM = "source_system"
TOPIC = "topic"
BUSINESS_OBJECT = "business_object"
@dataclass(frozen=True)
class ContextEntity:
entity_type: ContextEntityType
name: str
external_ref: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
entity_id: str = field(default_factory=lambda: new_id("entity"))
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"entity_id": self.entity_id,
"entity_type": self.entity_type.value,
"name": self.name,
"external_ref": self.external_ref,
"metadata": dict(self.metadata),
}
)
class RelationshipTargetKind(str, Enum):
ASSET = "asset"
CONTEXT_ENTITY = "context_entity"
@dataclass(frozen=True)
class CoreRelationship:
source_id: str
target_id: str
predicate: str
target_kind: RelationshipTargetKind = RelationshipTargetKind.ASSET
direction: str = "outbound"
confidence: float | None = None
valid_from: str | None = None
valid_to: str | None = None
actor_id: str | None = None
provenance: dict[str, Any] = field(default_factory=dict)
relationship_id: str = field(default_factory=lambda: new_id("rel"))
created_at: str = field(default_factory=lambda: utc_now().isoformat())
def to_dict(self) -> dict[str, Any]:
return compact_dict(
{
"relationship_id": self.relationship_id,
"source_id": self.source_id,
"target_id": self.target_id,
"predicate": self.predicate,
"target_kind": self.target_kind.value,
"direction": self.direction,
"confidence": self.confidence,
"valid_from": self.valid_from,
"valid_to": self.valid_to,
"actor_id": self.actor_id,
"provenance": dict(self.provenance),
"created_at": self.created_at,
}
)