Add llm-assisted discovery extraction

This commit is contained in:
2026-05-19 04:35:35 +02:00
parent bc25eb6871
commit a76c6a4aea
7 changed files with 981 additions and 4 deletions

View File

@@ -55,6 +55,42 @@ The deterministic extractor framework currently covers:
Each extractor emits candidates through the same accumulator so stable-key
duplicates merge inside a scan before the snapshot is returned.
## LLM-Assisted Extraction
LLM extraction is optional and explicit:
```bash
railiance-fabric scan . \
--repo-slug railiance-fabric \
--llm \
--llm-provider openai \
--llm-model gpt-4.1-mini \
--dry-run \
--output discovery-with-llm.json
```
The implementation integrates through `llm-connect` with `create_adapter` and
`RunConfig`. Tests use a `MockLLMAdapter`-compatible boundary so CI stays
offline. If `llm-connect` is unavailable, the provider call fails, or the model
returns malformed JSON, the scanner records a `review_artifacts` entry and keeps
the discovery snapshot schema-valid.
The LLM never receives the whole repository. The scanner first builds a compact
evidence bundle from deterministic candidates, prioritizing repo-owned Fabric
declarations, services, capabilities, interfaces, libraries, deployments, and
small README/INTENT/SCOPE signals. The prompt asks for strict JSON:
```json
{"nodes": [], "edges": [], "attributes": []}
```
Projected LLM candidates are always `origin: llm` and
`review_state: needs_review`. Candidates below the configured confidence
threshold become `llm_low_confidence` review artifacts instead of graph
candidates. Unresolved edge endpoints or attribute targets also become review
artifacts. Accepted graph data still requires deterministic evidence,
repo-owned declarations, or a later human review/acceptance path.
## Identity
Identity is the main safety boundary. The scanner must not append guesses on

View File

@@ -13,6 +13,7 @@ from pathlib import Path
from .loader import declaration_files, load_yaml
from .graph import FabricGraph, build_graph
from .graph_explorer import fabric_graph_explorer_payload
from .llm_extraction import LLMExtractionConfig
from .scanner import ScanOptions, scan_repo
from .validation import validate_roots
@@ -73,6 +74,12 @@ def build_parser() -> argparse.ArgumentParser:
scan.add_argument("--dry-run", action="store_true", help="Do not write anywhere except an explicit --output file.")
scan.add_argument("--output", type=Path, default=None, help="Write the discovery snapshot JSON to a file.")
scan.add_argument("--json", action="store_true", help="Print the discovery snapshot JSON to stdout.")
scan.add_argument("--llm", action="store_true", help="Enable llm-connect assisted extraction.")
scan.add_argument("--llm-provider", default="mock", help="llm-connect provider name.")
scan.add_argument("--llm-model", default="mock", help="Model name passed to llm-connect.")
scan.add_argument("--llm-temperature", type=float, default=0.0)
scan.add_argument("--llm-max-tokens", type=int, default=1500)
scan.add_argument("--llm-min-confidence", type=float, default=0.6)
registry = sub.add_parser("registry", help="Feed a running Railiance Fabric registry service.")
registry_sub = registry.add_subparsers(dest="registry_command", required=True)
@@ -392,8 +399,15 @@ def _scan_repo(args: argparse.Namespace) -> int:
domain=args.domain,
commit=args.commit,
profile=args.profile,
deterministic_only=True,
llm_enabled=False,
deterministic_only=not args.llm,
llm_enabled=args.llm,
llm_config=LLMExtractionConfig(
provider=args.llm_provider,
model=args.llm_model,
temperature=args.llm_temperature,
max_tokens=args.llm_max_tokens,
min_confidence=args.llm_min_confidence,
),
)
)
payload = json.dumps(snapshot, indent=2, sort_keys=True)
@@ -405,6 +419,8 @@ def _scan_repo(args: argparse.Namespace) -> int:
return 0
candidates = snapshot["candidates"]
review_count = len(snapshot.get("review_artifacts", []))
review_summary = f", {review_count} review artifact(s)" if review_count else ""
mode = "dry-run " if args.dry_run else ""
print(
f"{mode}scan {snapshot['source']['repo_slug']} "
@@ -413,6 +429,7 @@ def _scan_repo(args: argparse.Namespace) -> int:
f"{len(candidates['edges'])} edge(s), "
f"{len(candidates['attributes'])} attribute(s), "
f"{len(snapshot['replacement_scopes'])} replacement scope(s)"
f"{review_summary}"
)
if args.output:
print(f"wrote {args.output}")

View File

@@ -0,0 +1,681 @@
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable
from jsonschema import ValidationError
from .discovery import (
attribute_stable_key,
discovery_stable_key,
relationship_stable_key,
replacement_scope_id,
short_fingerprint,
source_fingerprint,
)
from .schema_validation import draft202012_validator
PROMPT_VERSION = "repo-evidence-v1"
EXTRACTOR_ID = "llm-connect-repo-evidence"
EXTRACTOR_VERSION = "0.1.0"
@dataclass(frozen=True)
class LLMExtractionConfig:
provider: str = "mock"
model: str = "mock"
temperature: float = 0.0
max_tokens: int = 1500
min_confidence: float = 0.6
max_evidence_items: int = 14
api_key: str | None = None
@dataclass(frozen=True)
class LocalRunConfig:
model_name: str
temperature: float
max_tokens: int
model_params: dict[str, object]
class LLMExtractionError(RuntimeError):
pass
def augment_snapshot_with_llm(
snapshot: dict[str, Any],
*,
config: LLMExtractionConfig | None = None,
adapter: object | None = None,
) -> dict[str, Any]:
"""Return a copy of ``snapshot`` enriched with schema-gated LLM candidates."""
config = config or LLMExtractionConfig()
augmented = _copy_json(snapshot)
artifacts: list[dict[str, object]] = list(augmented.get("review_artifacts", []))
bundle = build_evidence_bundle(snapshot, max_items=config.max_evidence_items)
bundle_hash = short_fingerprint(bundle, length=16)
prompt = build_llm_prompt(bundle)
try:
llm_adapter = adapter or create_llm_adapter(config)
run_config = create_run_config(config)
response = llm_adapter.execute_prompt(prompt, run_config)
raw_output = _response_content(response)
except Exception as exc:
augmented["review_artifacts"] = [
*artifacts,
review_artifact(
artifact_type="llm_execution_error",
message=f"LLM extraction failed: {exc}",
payload={"provider": config.provider, "model": config.model},
),
]
_mark_llm_scan_metadata(augmented, config)
return augmented
try:
parsed = parse_llm_json(raw_output)
except Exception as exc:
augmented["review_artifacts"] = [
*artifacts,
review_artifact(
artifact_type="llm_output_invalid",
message=f"LLM output is not a valid structured extraction: {exc}",
payload={"raw_output": raw_output},
),
]
_mark_llm_scan_metadata(augmented, config)
return augmented
response_model = str(getattr(response, "model", config.model) or config.model)
usage = getattr(response, "usage", {}) if isinstance(getattr(response, "usage", {}), dict) else {}
metadata = getattr(response, "metadata", {}) if isinstance(getattr(response, "metadata", {}), dict) else {}
candidates, rejected = project_llm_output(
parsed,
snapshot,
bundle,
config=config,
model=response_model,
usage=usage,
metadata=metadata,
bundle_hash=bundle_hash,
)
artifacts.extend(rejected)
candidate_snapshot = _copy_json(augmented)
_merge_candidates(candidate_snapshot, candidates)
if artifacts:
candidate_snapshot["review_artifacts"] = artifacts
_mark_llm_scan_metadata(candidate_snapshot, config)
try:
draft202012_validator(Path("schemas") / "discovery-snapshot.schema.yaml").validate(candidate_snapshot)
except ValidationError as exc:
augmented["review_artifacts"] = [
*artifacts,
review_artifact(
artifact_type="llm_output_invalid",
message=f"LLM candidates did not validate against discovery schema: {exc.message}",
payload={"parsed_output": parsed},
),
]
_mark_llm_scan_metadata(augmented, config)
return augmented
return candidate_snapshot
def create_llm_adapter(config: LLMExtractionConfig) -> object:
try:
from llm_connect import create_adapter
except ModuleNotFoundError as exc:
raise LLMExtractionError("llm-connect is not importable") from exc
return create_adapter(config.provider, model=config.model, api_key=config.api_key)
def create_run_config(config: LLMExtractionConfig) -> object:
try:
from llm_connect import RunConfig
except ModuleNotFoundError:
return LocalRunConfig(
model_name=config.model,
temperature=config.temperature,
max_tokens=config.max_tokens,
model_params={
"response_format": "json_object",
"prompt_version": PROMPT_VERSION,
},
)
return RunConfig(
model_name=config.model,
temperature=config.temperature,
max_tokens=config.max_tokens,
model_params={
"response_format": "json_object",
"prompt_version": PROMPT_VERSION,
},
)
def build_evidence_bundle(snapshot: dict[str, Any], *, max_items: int = 14) -> dict[str, object]:
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
nodes = candidates.get("nodes") if isinstance(candidates.get("nodes"), list) else []
attributes = candidates.get("attributes") if isinstance(candidates.get("attributes"), list) else []
scored: list[tuple[int, dict[str, object]]] = []
for node in nodes:
if not isinstance(node, dict):
continue
scored.append((_node_evidence_score(node), _bundle_node(node)))
scored.sort(key=lambda item: (-item[0], str(item[1].get("id", ""))))
text_attributes = [
_bundle_attribute(attribute)
for attribute in attributes
if isinstance(attribute, dict) and str(attribute.get("name", "")).endswith(("_title", "_present"))
]
return {
"repo": snapshot.get("source", {}),
"scan": {
"run_id": (snapshot.get("scan") or {}).get("run_id", ""),
"profile": (snapshot.get("scan") or {}).get("profile", ""),
},
"evidence": [item for _, item in scored[:max_items]],
"attributes": text_attributes[:max_items],
}
def build_llm_prompt(bundle: dict[str, object]) -> str:
return "\n".join(
[
"You are enriching a Railiance Fabric discovery snapshot.",
"Use only the JSON evidence bundle below. Do not invent facts.",
"Return strict JSON with this shape:",
'{"nodes":[],"edges":[],"attributes":[]}',
"Node fields: kind, label, confidence, evidence_refs, rationale, aliases, attributes.",
"Edge fields: edge_type, source_label or source_key, target_label or target_key, confidence, evidence_refs, rationale.",
"Attribute fields: entity_label or entity_key, name, value, confidence, evidence_refs, rationale.",
"Use confidence from 0 to 1. Low confidence or uncertainty is acceptable; it will be reviewed.",
"Evidence bundle:",
json.dumps(bundle, indent=2, sort_keys=True),
]
)
def parse_llm_json(content: str) -> dict[str, object]:
text = _strip_code_fence(content.strip())
try:
parsed = json.loads(text)
except json.JSONDecodeError as exc:
raise LLMExtractionError(f"LLM output is not valid JSON: {exc}") from exc
if not isinstance(parsed, dict):
raise LLMExtractionError("LLM output must be a JSON object")
return parsed
def project_llm_output(
output: dict[str, object],
snapshot: dict[str, Any],
bundle: dict[str, object],
*,
config: LLMExtractionConfig,
model: str,
usage: dict[str, object],
metadata: dict[str, object],
bundle_hash: str,
) -> tuple[dict[str, list[dict[str, object]]], list[dict[str, object]]]:
repo_slug = str((snapshot.get("source") or {}).get("repo_slug") or "repo")
run_id = str((snapshot.get("scan") or {}).get("run_id") or "")
scope = _llm_scope(repo_slug, bundle_hash)
llm_anchor = _llm_anchor(run_id, bundle_hash)
provenance_base = {
"extractor_id": EXTRACTOR_ID,
"extractor_version": EXTRACTOR_VERSION,
"method": "llm",
"origin": "llm",
"prompt_version": PROMPT_VERSION,
"provider": config.provider,
"model": model,
"usage": usage,
}
if metadata:
provenance_base["rationale"] = f"metadata={json.dumps(metadata, sort_keys=True, default=str)}"
evidence_index = _evidence_index(bundle)
entity_index = _entity_index(snapshot)
candidates = {"nodes": [], "edges": [], "attributes": []}
artifacts: list[dict[str, object]] = []
for raw_node in _object_list(output.get("nodes")):
confidence = _confidence(raw_node.get("confidence"))
if confidence < config.min_confidence:
artifacts.append(_low_confidence_artifact(raw_node, confidence))
continue
label = str(raw_node.get("label") or "").strip()
kind = str(raw_node.get("kind") or "DiscoveredEntity").strip()
if not label:
artifacts.append(_invalid_candidate_artifact("LLM node is missing label", raw_node))
continue
stable_key = discovery_stable_key(repo_slug, kind, label)
entity_index[_entity_lookup_key(label, kind)] = stable_key
entity_index[_entity_lookup_key(label, "")] = stable_key
source_anchors = _anchors_for_refs(raw_node.get("evidence_refs"), evidence_index, llm_anchor)
provenance = {**provenance_base}
rationale = str(raw_node.get("rationale") or "").strip()
if rationale:
provenance["rationale"] = rationale
candidates["nodes"].append(
{
"stable_key": stable_key,
"kind": kind,
"label": label,
"repo": repo_slug,
"aliases": _strings(raw_node.get("aliases")) + [label],
"attributes": _json_object(raw_node.get("attributes")) if isinstance(raw_node.get("attributes"), dict) else {},
"origin": "llm",
"review_state": "needs_review",
"status": "active",
"confidence": confidence,
"replacement_scope": scope["id"],
"provenance": [provenance],
"source_anchors": source_anchors,
}
)
for raw_edge in _object_list(output.get("edges")):
confidence = _confidence(raw_edge.get("confidence"))
if confidence < config.min_confidence:
artifacts.append(_low_confidence_artifact(raw_edge, confidence))
continue
edge_type = str(raw_edge.get("edge_type") or "").strip()
source_key = _resolve_entity_key(raw_edge, "source", entity_index)
target_key = _resolve_entity_key(raw_edge, "target", entity_index)
if not edge_type or not source_key or not target_key:
artifacts.append(_unresolved_candidate_artifact("LLM edge endpoint could not be resolved", raw_edge))
continue
source_anchors = _anchors_for_refs(raw_edge.get("evidence_refs"), evidence_index, llm_anchor)
provenance = {**provenance_base}
rationale = str(raw_edge.get("rationale") or "").strip()
if rationale:
provenance["rationale"] = rationale
candidates["edges"].append(
{
"stable_key": relationship_stable_key(source_key, edge_type, target_key, evidence_scope=scope["id"]),
"edge_type": edge_type,
"source_key": source_key,
"target_key": target_key,
"attributes": _json_object(raw_edge.get("attributes")) if isinstance(raw_edge.get("attributes"), dict) else {},
"origin": "llm",
"review_state": "needs_review",
"status": "active",
"confidence": confidence,
"replacement_scope": scope["id"],
"provenance": [provenance],
"source_anchors": source_anchors,
}
)
for raw_attribute in _object_list(output.get("attributes")):
confidence = _confidence(raw_attribute.get("confidence"))
if confidence < config.min_confidence:
artifacts.append(_low_confidence_artifact(raw_attribute, confidence))
continue
entity_key = _resolve_entity_key(raw_attribute, "entity", entity_index)
name = str(raw_attribute.get("name") or "").strip()
if not entity_key or not name:
artifacts.append(_unresolved_candidate_artifact("LLM attribute target could not be resolved", raw_attribute))
continue
source_anchors = _anchors_for_refs(raw_attribute.get("evidence_refs"), evidence_index, llm_anchor)
provenance = {**provenance_base}
rationale = str(raw_attribute.get("rationale") or "").strip()
if rationale:
provenance["rationale"] = rationale
candidates["attributes"].append(
{
"stable_key": attribute_stable_key(entity_key, name),
"entity_key": entity_key,
"name": name,
"value": _json_value(raw_attribute.get("value")),
"origin": "llm",
"review_state": "needs_review",
"confidence": confidence,
"replacement_scope": scope["id"],
"provenance": [provenance],
"source_anchors": source_anchors,
}
)
candidates["replacement_scopes"] = [scope]
return candidates, artifacts
def review_artifact(
*,
artifact_type: str,
message: str,
payload: dict[str, object] | None = None,
evidence_refs: Iterable[str] = (),
) -> dict[str, object]:
now = _utc_now()
body = {
"artifact_type": artifact_type,
"message": message,
"payload": payload or {},
"evidence_refs": list(evidence_refs),
"created_at": now,
}
return {
"id": f"review:{short_fingerprint(body, length=20)}",
"origin": "llm",
**body,
}
def _mark_llm_scan_metadata(snapshot: dict[str, Any], config: LLMExtractionConfig) -> None:
scan = snapshot.setdefault("scan", {})
scan["llm_enabled"] = True
scan["deterministic_only"] = False
scan["llm_budget"] = {
"provider": config.provider,
"model": config.model,
"max_tokens": config.max_tokens,
"min_confidence": config.min_confidence,
"prompt_version": PROMPT_VERSION,
}
def _merge_candidates(snapshot: dict[str, Any], candidates: dict[str, list[dict[str, object]]]) -> None:
existing_scopes = {
str(scope.get("id")): scope
for scope in snapshot.setdefault("replacement_scopes", [])
if isinstance(scope, dict)
}
for scope in candidates.get("replacement_scopes", []):
existing_scopes[str(scope["id"])] = scope
snapshot["replacement_scopes"] = [existing_scopes[key] for key in sorted(existing_scopes)]
snapshot_candidates = snapshot.setdefault("candidates", {"nodes": [], "edges": [], "attributes": []})
for collection in ("nodes", "edges", "attributes"):
existing = {
str(item.get("stable_key")): item
for item in snapshot_candidates.setdefault(collection, [])
if isinstance(item, dict)
}
for incoming in candidates.get(collection, []):
key = str(incoming.get("stable_key"))
existing[key] = _merge_candidate(existing.get(key), incoming)
snapshot_candidates[collection] = [existing[key] for key in sorted(existing)]
def _merge_candidate(existing: dict[str, object] | None, incoming: dict[str, object]) -> dict[str, object]:
if existing is None:
return incoming
merged = {**existing}
for field in ("aliases", "provenance", "source_anchors"):
values = [*list(existing.get(field, [])), *list(incoming.get(field, []))]
if values:
merged[field] = _unique_json(values) if field != "aliases" else _unique_strings(values)
if isinstance(existing.get("attributes"), dict) or isinstance(incoming.get("attributes"), dict):
merged["attributes"] = {
**(existing.get("attributes") if isinstance(existing.get("attributes"), dict) else {}),
**(incoming.get("attributes") if isinstance(incoming.get("attributes"), dict) else {}),
}
if isinstance(existing.get("confidence"), (int, float)) and isinstance(incoming.get("confidence"), (int, float)):
merged["confidence"] = max(float(existing["confidence"]), float(incoming["confidence"]))
return merged
def _llm_scope(repo_slug: str, bundle_hash: str) -> dict[str, object]:
return {
"id": replacement_scope_id(repo_slug, EXTRACTOR_ID, "llm", source_path=bundle_hash),
"extractor_id": EXTRACTOR_ID,
"source_kind": "llm",
"source_path": bundle_hash,
"mode": "additive",
"description": "LLM-assisted extraction over deterministic evidence bundle.",
}
def _llm_anchor(run_id: str, bundle_hash: str) -> dict[str, object]:
anchor = {
"source_kind": "llm",
"ref": f"{PROMPT_VERSION}:{run_id}:{bundle_hash}",
}
anchor["fingerprint"] = source_fingerprint(anchor)
return anchor
def _evidence_index(bundle: dict[str, object]) -> dict[str, list[dict[str, object]]]:
index: dict[str, list[dict[str, object]]] = {}
for item in list(bundle.get("evidence", [])) + list(bundle.get("attributes", [])):
if not isinstance(item, dict):
continue
item_id = str(item.get("id") or "")
anchors = item.get("source_anchors")
if item_id and isinstance(anchors, list):
index[item_id] = [anchor for anchor in anchors if isinstance(anchor, dict)]
return index
def _entity_index(snapshot: dict[str, Any]) -> dict[str, str]:
index: dict[str, str] = {}
candidates = snapshot.get("candidates") if isinstance(snapshot.get("candidates"), dict) else {}
for node in candidates.get("nodes", []):
if not isinstance(node, dict):
continue
stable_key = str(node.get("stable_key") or "")
kind = str(node.get("kind") or "")
label = str(node.get("label") or "")
if stable_key:
index[stable_key] = stable_key
if label and stable_key:
index[_entity_lookup_key(label, kind)] = stable_key
index[_entity_lookup_key(label, "")] = stable_key
for alias in _strings(node.get("aliases")):
index[_entity_lookup_key(alias, kind)] = stable_key
index[_entity_lookup_key(alias, "")] = stable_key
graph_id = str(node.get("graph_id") or "")
if graph_id and stable_key:
index[_entity_lookup_key(graph_id, kind)] = stable_key
index[_entity_lookup_key(graph_id, "")] = stable_key
return index
def _entity_lookup_key(label: str, kind: str) -> str:
return f"{kind.strip().lower()}::{label.strip().lower()}"
def _resolve_entity_key(raw: dict[str, object], role: str, entity_index: dict[str, str]) -> str:
explicit = str(raw.get(f"{role}_key") or "").strip()
if explicit:
return entity_index.get(explicit, explicit if explicit.startswith("discovery:") else "")
label = str(raw.get(f"{role}_label") or "").strip()
kind = str(raw.get(f"{role}_kind") or "").strip()
if not label:
return ""
return entity_index.get(_entity_lookup_key(label, kind), entity_index.get(_entity_lookup_key(label, ""), ""))
def _anchors_for_refs(
refs: object,
evidence_index: dict[str, list[dict[str, object]]],
fallback: dict[str, object],
) -> list[dict[str, object]]:
anchors: list[dict[str, object]] = []
for ref in _strings(refs):
anchors.extend(evidence_index.get(ref, []))
anchors.append(fallback)
return _unique_json(anchors)
def _node_evidence_score(node: dict[str, object]) -> int:
kind = str(node.get("kind") or "")
score = 1
if node.get("origin") == "repo_declaration":
score += 10
if kind in {"ServiceDeclaration", "CapabilityDeclaration", "InterfaceDeclaration", "Library"}:
score += 8
if kind in {"DeploymentService", "ContainerBuild", "ScoreWorkload"} or kind.startswith("Kubernetes"):
score += 5
if kind in {"Repository", "ExternalLibrary", "Lockfile", "ServiceConfig"}:
score += 2
return score
def _bundle_node(node: dict[str, object]) -> dict[str, object]:
return {
"id": str(node.get("stable_key") or ""),
"kind": node.get("kind") or "",
"label": node.get("label") or "",
"graph_id": node.get("graph_id") or "",
"origin": node.get("origin") or "",
"review_state": node.get("review_state") or "",
"attributes": _compact_attributes(node.get("attributes")),
"source_anchors": node.get("source_anchors") if isinstance(node.get("source_anchors"), list) else [],
}
def _bundle_attribute(attribute: dict[str, object]) -> dict[str, object]:
return {
"id": str(attribute.get("stable_key") or ""),
"entity_key": attribute.get("entity_key") or "",
"name": attribute.get("name") or "",
"value": attribute.get("value"),
"source_anchors": attribute.get("source_anchors") if isinstance(attribute.get("source_anchors"), list) else [],
}
def _compact_attributes(value: object) -> dict[str, object]:
if not isinstance(value, dict):
return {}
compact: dict[str, object] = {}
for key, item in value.items():
if key in {"metadata", "spec"}:
continue
compact[str(key)] = _json_value(item)
return compact
def _low_confidence_artifact(raw: dict[str, object], confidence: float) -> dict[str, object]:
return review_artifact(
artifact_type="llm_low_confidence",
message=f"LLM candidate below confidence threshold: {confidence:.2f}",
payload={"candidate": raw, "confidence": confidence},
evidence_refs=_strings(raw.get("evidence_refs")),
)
def _invalid_candidate_artifact(message: str, raw: dict[str, object]) -> dict[str, object]:
return review_artifact(
artifact_type="llm_output_invalid",
message=message,
payload={"candidate": raw},
evidence_refs=_strings(raw.get("evidence_refs")),
)
def _unresolved_candidate_artifact(message: str, raw: dict[str, object]) -> dict[str, object]:
return review_artifact(
artifact_type="llm_candidate_unresolved",
message=message,
payload={"candidate": raw},
evidence_refs=_strings(raw.get("evidence_refs")),
)
def _object_list(value: object) -> list[dict[str, object]]:
if not isinstance(value, list):
return []
return [item for item in value if isinstance(item, dict)]
def _confidence(value: object) -> float:
if isinstance(value, (int, float)):
return max(0.0, min(1.0, float(value)))
return 0.0
def _strings(value: object) -> list[str]:
if isinstance(value, str):
values = [value]
elif isinstance(value, list):
values = value
else:
values = []
result: list[str] = []
seen: set[str] = set()
for item in values:
text = str(item or "").strip()
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result
def _json_object(value: object) -> dict[str, object]:
if not isinstance(value, dict):
return {}
return {str(key): _json_value(item) for key, item in value.items()}
def _json_value(value: object) -> object:
if value is None or isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, list):
return [_json_value(item) for item in value]
if isinstance(value, tuple):
return [_json_value(item) for item in value]
if isinstance(value, dict):
return {str(key): _json_value(item) for key, item in value.items()}
return str(value)
def _unique_strings(values: Iterable[object]) -> list[str]:
seen: set[str] = set()
result: list[str] = []
for value in values:
text = str(value or "").strip()
if not text or text in seen:
continue
seen.add(text)
result.append(text)
return result
def _unique_json(values: Iterable[object]) -> list[object]:
seen: set[str] = set()
result: list[object] = []
for value in values:
key = json.dumps(value, sort_keys=True, default=str)
if key in seen:
continue
seen.add(key)
result.append(value)
return result
def _response_content(response: object) -> str:
content = getattr(response, "content", "")
if not isinstance(content, str):
raise LLMExtractionError("LLM response content must be text")
return content
def _strip_code_fence(text: str) -> str:
match = re.fullmatch(r"```(?:json)?\s*(.*?)\s*```", text, flags=re.DOTALL)
return match.group(1) if match else text
def _copy_json(value: dict[str, Any]) -> dict[str, Any]:
return json.loads(json.dumps(value, default=str))
def _utc_now() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")

View File

@@ -20,6 +20,7 @@ from .discovery import (
short_fingerprint,
source_fingerprint,
)
from .llm_extraction import LLMExtractionConfig, augment_snapshot_with_llm
from .loader import declaration_files, load_yaml
@@ -86,6 +87,8 @@ class ScanOptions:
profile: str = "deterministic"
deterministic_only: bool = True
llm_enabled: bool = False
llm_config: LLMExtractionConfig | None = None
llm_adapter: object | None = None
class CandidateAccumulator:
@@ -287,7 +290,7 @@ def scan_repo(options: ScanOptions | Path, **overrides: object) -> dict[str, obj
profile=normalize_identity_part(options.profile),
fingerprint=short_fingerprint({"commit": commit, "path": str(repo_path)}),
)
return {
snapshot = {
"apiVersion": "railiance.fabric/v1alpha1",
"kind": "FabricDiscoverySnapshot",
"generated_at": now,
@@ -316,6 +319,13 @@ def scan_repo(options: ScanOptions | Path, **overrides: object) -> dict[str, obj
"retirement_policy": "missing candidates retire only inside their replacement scope",
},
}
if options.llm_enabled:
return augment_snapshot_with_llm(
snapshot,
config=options.llm_config,
adapter=options.llm_adapter,
)
return snapshot
@dataclass

View File

@@ -97,6 +97,10 @@ properties:
type: array
items:
$ref: "#/$defs/tombstone"
review_artifacts:
type: array
items:
$ref: "#/$defs/reviewArtifact"
reconciliation:
type: object
additionalProperties: false
@@ -485,3 +489,39 @@ $defs:
previous_candidate:
type: object
additionalProperties: true
reviewArtifact:
type: object
additionalProperties: false
required:
- id
- artifact_type
- origin
- message
- created_at
properties:
id:
$ref: "#/$defs/stableKey"
artifact_type:
type: string
enum:
- llm_output_invalid
- llm_low_confidence
- llm_candidate_unresolved
- llm_execution_error
origin:
$ref: "#/$defs/origin"
message:
type: string
minLength: 1
evidence_refs:
type: array
items:
type: string
minLength: 1
payload:
type: object
additionalProperties: true
created_at:
type: string
format: date-time

View File

@@ -0,0 +1,193 @@
from __future__ import annotations
import json
import sys
import types
from pathlib import Path
from types import SimpleNamespace
from railiance_fabric.llm_extraction import LLMExtractionConfig, PROMPT_VERSION
from railiance_fabric.scanner import ScanOptions, scan_repo
from railiance_fabric.schema_validation import draft202012_validator
def test_llm_extraction_uses_llm_connect_boundary_with_mock_adapter(tmp_path: Path, monkeypatch) -> None:
repo = _minimal_repo(tmp_path)
response = json.dumps(
{
"nodes": [
{
"kind": "CapabilityDeclaration",
"label": "Fixture Operations",
"confidence": 0.82,
"evidence_refs": [],
"aliases": ["fixture-ops"],
"attributes": {"capability_type": "operations"},
"rationale": "README describes operational responsibility.",
}
],
"edges": [
{
"edge_type": "suggests_capability",
"source_label": "Fixture Repo",
"target_label": "Fixture Operations",
"confidence": 0.78,
"evidence_refs": [],
"rationale": "The repository appears to own this capability.",
}
],
"attributes": [
{
"entity_label": "Fixture Operations",
"name": "uncertainty",
"value": "needs human review",
"confidence": 0.75,
"evidence_refs": [],
"rationale": "LLM-only extraction should remain review-gated.",
}
],
}
)
calls: dict[str, object] = {}
fake_module = types.ModuleType("llm_connect")
class RunConfig:
def __init__(self, **kwargs: object) -> None:
self.kwargs = kwargs
self.model_name = str(kwargs["model_name"])
class MockLLMAdapter:
def __init__(self, mock_response: str = response) -> None:
self.mock_response = mock_response
def execute_prompt(self, prompt: str, config: RunConfig) -> SimpleNamespace:
calls["prompt"] = prompt
calls["config"] = config
return SimpleNamespace(
content=self.mock_response,
model=config.model_name,
usage={"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15},
metadata={"mock": True},
)
def create_adapter(provider: str, model: str | None = None, api_key: str | None = None) -> MockLLMAdapter:
calls["provider"] = provider
calls["model"] = model
calls["api_key"] = api_key
return MockLLMAdapter()
fake_module.RunConfig = RunConfig
fake_module.MockLLMAdapter = MockLLMAdapter
fake_module.create_adapter = create_adapter
monkeypatch.setitem(sys.modules, "llm_connect", fake_module)
snapshot = scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="abc123",
llm_enabled=True,
deterministic_only=False,
llm_config=LLMExtractionConfig(provider="mock", model="mock-model", min_confidence=0.6),
)
)
_validate_schema("discovery-snapshot.schema.yaml", snapshot)
assert calls["provider"] == "mock"
assert calls["model"] == "mock-model"
assert isinstance(calls["config"], RunConfig)
assert "Evidence bundle:" in str(calls["prompt"])
assert "Use only the JSON evidence bundle below" in str(calls["prompt"])
assert snapshot["scan"]["llm_enabled"] is True
assert snapshot["scan"]["deterministic_only"] is False
assert snapshot["scan"]["llm_budget"]["prompt_version"] == PROMPT_VERSION
llm_node = next(node for node in snapshot["candidates"]["nodes"] if node["label"] == "Fixture Operations")
assert llm_node["origin"] == "llm"
assert llm_node["review_state"] == "needs_review"
assert llm_node["confidence"] == 0.82
assert llm_node["provenance"][0]["provider"] == "mock"
assert llm_node["provenance"][0]["model"] == "mock-model"
assert llm_node["provenance"][0]["usage"]["total_tokens"] == 15
assert any(scope["source_kind"] == "llm" and scope["mode"] == "additive" for scope in snapshot["replacement_scopes"])
assert any(edge["edge_type"] == "suggests_capability" for edge in snapshot["candidates"]["edges"])
assert any(attribute["name"] == "uncertainty" for attribute in snapshot["candidates"]["attributes"])
def test_llm_extraction_fails_closed_for_bad_or_low_confidence_output(tmp_path: Path) -> None:
repo = _minimal_repo(tmp_path)
bad_snapshot = scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="abc123",
llm_enabled=True,
deterministic_only=False,
llm_config=LLMExtractionConfig(provider="mock", model="mock-model"),
llm_adapter=_Adapter("not json"),
)
)
_validate_schema("discovery-snapshot.schema.yaml", bad_snapshot)
assert {artifact["artifact_type"] for artifact in bad_snapshot["review_artifacts"]} == {"llm_output_invalid"}
low_confidence_snapshot = scan_repo(
ScanOptions(
repo_path=repo,
repo_slug="fixture-repo",
repo_name="Fixture Repo",
commit="abc123",
llm_enabled=True,
deterministic_only=False,
llm_config=LLMExtractionConfig(provider="mock", model="mock-model", min_confidence=0.6),
llm_adapter=_Adapter(
json.dumps(
{
"nodes": [
{
"kind": "CapabilityDeclaration",
"label": "Too Uncertain",
"confidence": 0.2,
"evidence_refs": [],
"rationale": "Weak signal.",
}
],
"edges": [],
"attributes": [],
}
)
),
)
)
_validate_schema("discovery-snapshot.schema.yaml", low_confidence_snapshot)
labels = {node["label"] for node in low_confidence_snapshot["candidates"]["nodes"]}
assert "Too Uncertain" not in labels
assert {artifact["artifact_type"] for artifact in low_confidence_snapshot["review_artifacts"]} == {"llm_low_confidence"}
class _Adapter:
def __init__(self, response: str) -> None:
self.response = response
def execute_prompt(self, prompt: str, config: object) -> SimpleNamespace:
return SimpleNamespace(
content=self.response,
model=getattr(config, "model_name", "mock-model"),
usage={"total_tokens": 1},
metadata={"mock": True},
)
def _minimal_repo(tmp_path: Path) -> Path:
repo = tmp_path / "fixture-repo"
repo.mkdir()
(repo / "README.md").write_text("# Fixture Repo\n\nOwns operational repo signals.\n", encoding="utf-8")
return repo
def _validate_schema(schema_name: str, payload: dict[str, object]) -> None:
validator = draft202012_validator(Path("schemas") / schema_name)
validator.validate(payload)

View File

@@ -172,7 +172,7 @@ Acceptance notes:
```task
id: RAIL-FAB-WP-0010-T03
status: todo
status: done
priority: high
state_hub_task_id: "59c206a3-94b9-4f47-9c4f-75f87aa8f505"
```