generic source-to-infospace generator

This commit is contained in:
2026-05-14 19:33:22 +02:00
parent 065e17f42e
commit 46aad3cce8
20 changed files with 1629 additions and 8 deletions

View File

@@ -0,0 +1,273 @@
from __future__ import annotations
import hashlib
import html
import re
import zipfile
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from .errors import InfospaceError
from .semantics import slugify
EXTRACTOR_VERSION = "generic-source-intake-v1"
SUPPORTED_EXTENSIONS = {".md", ".markdown", ".txt", ".html", ".htm", ".epub"}
HTML_TITLE_RE = re.compile(r"<title[^>]*>(?P<title>.*?)</title>", re.I | re.S)
HTML_H1_RE = re.compile(r"<h1[^>]*>(?P<title>.*?)</h1>", re.I | re.S)
SCRIPT_STYLE_RE = re.compile(r"<(script|style)[^>]*>.*?</\1>", re.I | re.S)
TAG_RE = re.compile(r"<[^>]+>")
@dataclass(frozen=True)
class SourceChunk:
chunk_id: str
title: str
markdown: str
source_type: str
original_path: str
digest: str
chunk_index: int
chunk_count: int
imported_at: str
extractor_version: str = EXTRACTOR_VERSION
def to_dict(self) -> dict:
return asdict(self)
@dataclass(frozen=True)
class _SourceDocument:
title: str
markdown: str
source_type: str
original_path: str
base_slug: str
def normalize_source(
source: str | Path,
*,
max_words: int = 800,
max_chunks: int | None = None,
) -> list[SourceChunk]:
source_path = Path(source)
if not source_path.exists():
raise InfospaceError(
"missing_source",
f"Source path does not exist: {source_path}",
{"source": str(source_path)},
)
documents = list(_iter_documents(source_path))
if not documents:
raise InfospaceError(
"unsupported_source",
f"No supported source documents found: {source_path}",
{
"source": str(source_path),
"supported_extensions": sorted(SUPPORTED_EXTENSIONS),
},
)
imported_at = datetime.now(timezone.utc).isoformat()
chunks: list[SourceChunk] = []
used_ids: set[str] = set()
for document in documents:
pieces = _chunk_markdown(document.markdown, max_words=max_words)
for index, piece in enumerate(pieces):
title = document.title if len(pieces) == 1 else f"{document.title} Part {index + 1}"
base_id = (
document.base_slug if len(pieces) == 1 else f"{document.base_slug}-part-{index + 1:03d}"
)
chunk_id = _dedupe_chunk_id(base_id, used_ids)
chunks.append(
SourceChunk(
chunk_id=chunk_id,
title=title,
markdown=piece,
source_type=document.source_type,
original_path=document.original_path,
digest=_digest_text(piece),
chunk_index=index,
chunk_count=len(pieces),
imported_at=imported_at,
)
)
if max_chunks is not None and max_chunks > 0 and len(chunks) >= max_chunks:
return chunks
return chunks
def _iter_documents(source_path: Path) -> Iterable[_SourceDocument]:
if source_path.is_dir():
for path in sorted(source_path.rglob("*")):
if path.is_file() and path.suffix.lower() in SUPPORTED_EXTENSIONS:
yield from _iter_documents(path)
return
suffix = source_path.suffix.lower()
if suffix in (".md", ".markdown"):
yield _markdown_document(source_path)
elif suffix == ".txt":
yield _text_document(source_path)
elif suffix in (".html", ".htm"):
yield _html_document(source_path, source_type="html")
elif suffix == ".epub":
yield from _epub_documents(source_path)
def _markdown_document(path: Path) -> _SourceDocument:
markdown = _normalize_newlines(path.read_text(encoding="utf-8")).strip() + "\n"
title = _markdown_title(markdown) or _title_from_path(path)
return _SourceDocument(
title=title,
markdown=_ensure_h1(markdown, title),
source_type="markdown",
original_path=str(path),
base_slug=slugify(title) or slugify(path.stem) or "source",
)
def _text_document(path: Path) -> _SourceDocument:
title = _title_from_path(path)
body = _normalize_newlines(path.read_text(encoding="utf-8")).strip()
markdown = f"# {title}\n\n{body}\n"
return _SourceDocument(
title=title,
markdown=markdown,
source_type="text",
original_path=str(path),
base_slug=slugify(title) or "source",
)
def _html_document(
path: Path,
*,
source_type: str,
original_path: str | None = None,
text: str | None = None,
) -> _SourceDocument:
raw = text if text is not None else path.read_text(encoding="utf-8")
title = _html_title(raw) or _title_from_path(path)
body = _html_to_text(raw)
if body.lower().startswith(title.lower()):
body = body[len(title) :].strip()
markdown = f"# {title}\n\n{body}\n"
return _SourceDocument(
title=title,
markdown=markdown,
source_type=source_type,
original_path=original_path or str(path),
base_slug=slugify(title) or slugify(path.stem) or "source",
)
def _epub_documents(path: Path) -> Iterable[_SourceDocument]:
try:
with zipfile.ZipFile(path) as archive:
names = [
name
for name in sorted(archive.namelist())
if Path(name).suffix.lower() in {".html", ".htm", ".xhtml", ".txt", ".md"}
and not name.endswith("/")
]
for name in names:
raw = archive.read(name).decode("utf-8", errors="replace")
pseudo_path = Path(name)
if pseudo_path.suffix.lower() in {".txt", ".md"}:
title = _markdown_title(raw) or _title_from_path(pseudo_path)
markdown = _ensure_h1(_normalize_newlines(raw).strip() + "\n", title)
yield _SourceDocument(
title=title,
markdown=markdown,
source_type="epub",
original_path=f"{path}!{name}",
base_slug=slugify(title) or slugify(pseudo_path.stem) or "source",
)
else:
yield _html_document(
pseudo_path,
source_type="epub",
original_path=f"{path}!{name}",
text=raw,
)
except zipfile.BadZipFile as exc:
raise InfospaceError(
"invalid_epub_source",
f"EPUB source is not a readable zip archive: {path}",
{"source": str(path)},
) from exc
def _chunk_markdown(markdown: str, *, max_words: int) -> list[str]:
text = markdown.strip()
if max_words <= 0:
return [text + "\n"]
words = text.split()
if len(words) <= max_words:
return [text + "\n"]
chunks: list[str] = []
heading = _markdown_title(text) or "Source"
body_words = re.sub(r"(?m)^# .+?\n+", "", text, count=1).split()
for start in range(0, len(body_words), max_words):
part = " ".join(body_words[start : start + max_words]).strip()
chunks.append(f"# {heading} Part {len(chunks) + 1}\n\n{part}\n")
return chunks
def _html_title(raw: str) -> str:
match = HTML_TITLE_RE.search(raw) or HTML_H1_RE.search(raw)
if not match:
return ""
return _collapse_ws(_html_to_text(match.group("title")))
def _html_to_text(raw: str) -> str:
cleaned = SCRIPT_STYLE_RE.sub(" ", raw)
cleaned = re.sub(r"</(p|div|section|article|h[1-6]|li)>", "\n", cleaned, flags=re.I)
cleaned = TAG_RE.sub(" ", cleaned)
cleaned = html.unescape(cleaned)
lines = [_collapse_ws(line) for line in cleaned.splitlines()]
return "\n\n".join(line for line in lines if line).strip()
def _ensure_h1(markdown: str, title: str) -> str:
if re.search(r"(?m)^#\s+\S", markdown):
return markdown
return f"# {title}\n\n{markdown.strip()}\n"
def _markdown_title(markdown: str) -> str:
match = re.search(r"(?m)^#\s+(?P<title>.+?)\s*$", markdown)
return match.group("title").strip() if match else ""
def _title_from_path(path: Path) -> str:
words = re.sub(r"[^A-Za-z0-9]+", " ", path.stem).strip()
return words.title() if words else "Source"
def _dedupe_chunk_id(base_id: str, used_ids: set[str]) -> str:
candidate = base_id or "source"
if candidate not in used_ids:
used_ids.add(candidate)
return candidate
index = 2
while f"{candidate}-{index}" in used_ids:
index += 1
deduped = f"{candidate}-{index}"
used_ids.add(deduped)
return deduped
def _digest_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def _collapse_ws(value: str) -> str:
return re.sub(r"\s+", " ", value).strip()
def _normalize_newlines(value: str) -> str:
return value.replace("\r\n", "\n").replace("\r", "\n")