generated from coulomb/repo-seed
structured logging around key workflows and docs for operational readiness
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
|
||||
from repo_registry.core.logging import LOGGER_NAME
|
||||
from repo_registry.core.service import RegistryService
|
||||
from repo_registry.llm_extraction import ExtractedAbility, ExtractedCapability
|
||||
from repo_registry.repo_ingestion.git import GitIngestionService
|
||||
@@ -459,6 +462,32 @@ def test_register_repository_imports_metadata_when_name_is_omitted(tmp_path):
|
||||
assert repository.description == "Imported description."
|
||||
|
||||
|
||||
def test_operational_logging_records_analysis_and_review_events(tmp_path, caplog):
|
||||
source = tmp_path / "repo"
|
||||
source.mkdir()
|
||||
(source / "README.md").write_text("# Logged\n", encoding="utf-8")
|
||||
(source / "app.py").write_text(
|
||||
"from fastapi import FastAPI\n"
|
||||
"app = FastAPI()\n"
|
||||
'@app.get("/health")\n'
|
||||
"def health():\n"
|
||||
" return {}\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
service = make_service(tmp_path)
|
||||
|
||||
with caplog.at_level(logging.INFO, logger=LOGGER_NAME):
|
||||
repository = service.register_repository(name="Logged", url=str(source))
|
||||
summary = service.analyze_repository(repository.id)
|
||||
service.approve_candidate_graph(repository.id, summary.analysis_run.id)
|
||||
|
||||
events = [json.loads(record.message)["event"] for record in caplog.records]
|
||||
assert "repository_registered" in events
|
||||
assert "analysis_started" in events
|
||||
assert "analysis_completed" in events
|
||||
assert "review_decision_recorded" in events
|
||||
|
||||
|
||||
def test_capability_must_belong_to_repository(tmp_path):
|
||||
service = make_service(tmp_path)
|
||||
first = service.register_repository(
|
||||
@@ -1193,3 +1222,42 @@ def test_analyze_repository_clones_git_url_before_scanning(tmp_path):
|
||||
fact_names = {(fact.kind, fact.name, fact.path) for fact in summary.facts}
|
||||
assert ("documentation", "README", "README.md") in fact_names
|
||||
assert ("framework", "pytest", "requirements.txt") in fact_names
|
||||
|
||||
|
||||
def test_operational_logging_records_analysis_and_review_events(caplog, tmp_path):
|
||||
source = tmp_path / "repo"
|
||||
source.mkdir()
|
||||
(source / "README.md").write_text("# Logged Service\n", encoding="utf-8")
|
||||
(source / "requirements.txt").write_text("fastapi\n", encoding="utf-8")
|
||||
(source / "app.py").write_text(
|
||||
"from fastapi import FastAPI\n"
|
||||
"app = FastAPI()\n"
|
||||
'@app.get("/health")\n'
|
||||
"def health():\n"
|
||||
" return {}\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
service = make_service(tmp_path)
|
||||
caplog.set_level(logging.INFO, logger=LOGGER_NAME)
|
||||
|
||||
repository = service.register_repository(name="Logged", url=str(source))
|
||||
summary = service.analyze_repository(repository.id)
|
||||
service.approve_candidate_graph(
|
||||
repository.id,
|
||||
summary.analysis_run.id,
|
||||
notes="Logged approval.",
|
||||
)
|
||||
|
||||
payloads = [
|
||||
json.loads(record.message)
|
||||
for record in caplog.records
|
||||
if record.name == LOGGER_NAME
|
||||
]
|
||||
events = {payload["event"] for payload in payloads}
|
||||
|
||||
assert "repository_registered" in events
|
||||
assert "analysis_started" in events
|
||||
assert "analysis_completed" in events
|
||||
assert "review_decision_recorded" in events
|
||||
assert all(payload["repository_id"] == repository.id for payload in payloads)
|
||||
|
||||
Reference in New Issue
Block a user