Implemented Ad-Hoc Task handling

This commit is contained in:
2026-05-01 21:27:52 +02:00
parent d015dc2e8a
commit 5d50dee3f1
10 changed files with 464 additions and 57 deletions

View File

@@ -62,9 +62,22 @@ VALID_WP_STATUSES = {"active", "completed", "archived"}
VALID_TASK_STATUSES = {"todo", "in_progress", "blocked", "done", "cancelled"}
VALID_TASK_PRIORITIES = {"low", "medium", "high", "critical"}
_WP_ID_RE = re.compile(r"^[A-Z]+-WP-\d+$")
_TASK_ID_RE = re.compile(r"^[A-Z]+-WP-\d+-T\d+$")
_WP_ID_RE = re.compile(r"^(?:[A-Z]+-WP-\d+|ADHOC-\d{4}-\d{2}-\d{2})$")
_TASK_ID_RE = re.compile(r"^(?:[A-Z]+-WP-\d+|ADHOC-\d{4}-\d{2}-\d{2})-T\d+$")
_TASK_BLOCK_RE = re.compile(r"```task\s*\n(.*?)\n```", re.DOTALL)
_ARCHIVED_WP_RE = re.compile(r"^\d{6}-(.+\.md)$")
def canonical_workplan_filename(path: Path) -> str:
return _ARCHIVED_WP_RE.sub(r"\1", path.name)
def iter_workplan_files(workplans_dir: Path, include_archived: bool = True) -> list[Path]:
files = sorted(workplans_dir.glob("*.md"))
archived_dir = workplans_dir / "archived"
if include_archived and archived_dir.is_dir():
files.extend(sorted(archived_dir.glob("*.md")))
return files
# ---------------------------------------------------------------------------
@@ -148,7 +161,8 @@ def parse_task_blocks(body: str) -> list[dict]:
def _check_workplan_file(wp_file: Path, report: Report) -> dict | None:
"""Validate one workplan file. Returns parsed frontmatter on success."""
fname = wp_file.name
fname = str(wp_file.relative_to(Path(report.repo_path)))
canonical_fname = canonical_workplan_filename(wp_file)
try:
text = wp_file.read_text(encoding="utf-8")
except OSError as e:
@@ -199,7 +213,7 @@ def _check_workplan_file(wp_file: Path, report: Report) -> dict | None:
report.add(Level.PASS, "frontmatter-id-format", f"id={wp_id}", fname)
# filename prefix
if wp_id and not fname.startswith(wp_id):
if wp_id and not canonical_fname.startswith(wp_id):
report.add(Level.WARN, "filename-id-prefix",
f"Filename should start with id '{wp_id}', got {fname!r}", fname)
elif wp_id:
@@ -252,7 +266,7 @@ def _check_workplan_file(wp_file: Path, report: Report) -> dict | None:
def check_files(workplans_dir: Path, report: Report) -> list[dict]:
"""Check all workplan .md files in workplans_dir."""
md_files = sorted(workplans_dir.glob("*.md"))
md_files = iter_workplan_files(workplans_dir)
if not md_files:
report.add(Level.WARN, "workplans-not-empty",
"workplans/ directory exists but contains no .md files")
@@ -261,6 +275,7 @@ def check_files(workplans_dir: Path, report: Report) -> list[dict]:
for wp_file in md_files:
meta = _check_workplan_file(wp_file, report)
if meta:
meta["_active_file"] = wp_file.parent == workplans_dir
metas.append(meta)
return metas
@@ -295,6 +310,7 @@ def check_api(api_base: str, metas: list[dict], domain_slug: str | None,
# Verify each state_hub_workstream_id reference
file_ws_ids: set[str] = set()
active_file_ws_ids: set[str] = set()
for meta in metas:
ws_id = str(meta.get("state_hub_workstream_id", "")).strip()
if not ws_id:
@@ -304,6 +320,8 @@ def check_api(api_base: str, metas: list[dict], domain_slug: str | None,
str(meta.get("id", "")))
continue
file_ws_ids.add(ws_id)
if meta.get("_active_file", True):
active_file_ws_ids.add(ws_id)
ws = _api_get(api_base, f"/workstreams/{ws_id}")
if ws is None:
report.add(Level.FAIL, "workstream-ref-exists",
@@ -349,7 +367,7 @@ def check_api(api_base: str, metas: list[dict], domain_slug: str | None,
continue
ws_id = ws["id"]
ws_slug = ws.get("slug", "")
if ws_id not in file_ws_ids:
if ws_id not in active_file_ws_ids:
report.add(
Level.FAIL, "orphan-workstream",
f"Active workstream '{ws_slug}' (id={ws_id[:8]}…, domain={t_domain}) "