feat(local-identity): implement Stage 1 — core file store (NK-WP-0002-T01)

Deliverables:
- src/local_identity/gecos.py: /etc/passwd GECOS parsing, current_username()
- src/local_identity/user.py: UserRecord dataclass, ProductionIdentity, make_test_user()
  - Pure test-user derivation: <user>N / +testN email alias / source_user tracking
- src/local_identity/store.py: file store CRUD backed by LOCAL_IDENTITY_HOME
  - ~/.local-identity/ mode 700, user files mode 600
  - All path lookups dynamic (env-var override enables clean test isolation)
- src/local_identity/cli.py: init/list/show commands; email from flag > config > prompt
- pyproject.toml + uv.lock: pyyaml dep, local-identity script entry point

Tests (41 passing):
- test_gecos.py: 9 tests — simple/comma/empty/non-ASCII/whitespace GECOS, fallback
- test_user.py: 14 tests — test-user derivation, YAML roundtrip, non-ASCII, idempotency
- test_store.py: 18 tests — dir creation, permissions (700/600), CRUD, list, config,
  idempotency (reinit with --force produces identical users)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-02 00:01:54 +01:00
parent 6ed0061962
commit 4491beaffe
11 changed files with 860 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
# local-identity: zero-dependency bootstrap user store for net-kingdom environments.
# See docs/LocalIdentity.md for design, scope, and security model.

View File

@@ -0,0 +1,123 @@
"""
local-identity CLI — entry point.
Commands:
init [--force] [--email EMAIL] Derive primary user from Linux identity,
generate test users, write store.
list List all users in the store.
show <username> Display a user's YAML record.
Environment:
LOCAL_IDENTITY_HOME Override the store directory (default: ~/.local-identity).
"""
import argparse
import sys
from .gecos import current_username, get_gecos_fullname
from .user import UserRecord, make_test_user
from . import store
def cmd_init(args: argparse.Namespace) -> None:
if store.store_exists() and not args.force:
print(
f"Store already exists at {store._store_dir()}. "
"Use --force to reinitialise.",
file=sys.stderr,
)
sys.exit(1)
username = current_username()
fullname = get_gecos_fullname(username)
# Email resolution: flag > existing config > interactive prompt
config = store.read_config() if store.store_exists() else {}
email: str = args.email or config.get("email", "")
if not email:
try:
email = input(f"Email address for {username}: ").strip()
except EOFError:
email = ""
if not email:
print("Error: email address is required.", file=sys.stderr)
sys.exit(1)
store.init_dirs()
config["email"] = email
store.write_config(config)
primary = UserRecord(username=username, fullname=fullname, email=email)
store.write_user(primary)
test1 = make_test_user(primary, 1)
test2 = make_test_user(primary, 2)
store.write_user(test1)
store.write_user(test2)
print(f"Initialised local-identity store at {store._store_dir()}")
print(f" Primary : {primary.username} ({primary.fullname}) <{primary.email}>")
print(f" Test 1 : {test1.username} <{test1.email}>")
print(f" Test 2 : {test2.username} <{test2.email}>")
def cmd_list(args: argparse.Namespace) -> None:
users = store.list_users()
if not users:
print("No users found. Run 'local-identity init' first.")
return
header = f"{'USERNAME':<20} {'FULLNAME':<30} {'EMAIL':<40} TYPE"
print(header)
print("-" * len(header))
for u in users:
utype = "test " if u.generated else "primary"
print(f"{u.username:<20} {u.fullname:<30} {u.email:<40} {utype}")
def cmd_show(args: argparse.Namespace) -> None:
try:
user = store.read_user(args.username)
except FileNotFoundError as exc:
print(str(exc), file=sys.stderr)
sys.exit(1)
print(user.to_yaml(), end="")
def main() -> None:
parser = argparse.ArgumentParser(
prog="local-identity",
description="Zero-dependency bootstrap user store for net-kingdom environments.",
epilog=(
"Store location: ~/.local-identity "
"(override with LOCAL_IDENTITY_HOME)"
),
)
sub = parser.add_subparsers(dest="command", required=True)
p_init = sub.add_parser(
"init",
help="Initialise store from Linux identity",
)
p_init.add_argument(
"--force", action="store_true",
help="Reinitialise even if the store already exists",
)
p_init.add_argument(
"--email",
help="Email address (skips interactive prompt)",
)
p_init.set_defaults(func=cmd_init)
p_list = sub.add_parser("list", help="List all users in the store")
p_list.set_defaults(func=cmd_list)
p_show = sub.add_parser("show", help="Display a user record")
p_show.add_argument("username", help="Username to display")
p_show.set_defaults(func=cmd_show)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,34 @@
"""
Parse the operator's Linux identity from /etc/passwd and the environment.
GECOS format: "Full Name,Room Number,Work Phone,Home Phone,Other"
We only need the first field (full name).
"""
import os
import pwd
def current_username() -> str:
"""Return the current Linux username."""
return (
os.environ.get("USER")
or os.environ.get("LOGNAME")
or pwd.getpwuid(os.getuid()).pw_name
)
def get_gecos_fullname(username: str) -> str:
"""
Extract the full name from /etc/passwd GECOS for the given username.
Falls back to the username itself if GECOS is absent or unparseable.
"""
try:
entry = pwd.getpwnam(username)
# GECOS may be "Full Name,room,work,home,other" — take the first field
fullname = entry.pw_gecos.split(",")[0].strip()
if fullname:
return fullname
except KeyError:
pass
return username

View File

@@ -0,0 +1,101 @@
"""
File-store operations for local-identity.
The store lives at ~/.local-identity/ by default. Set LOCAL_IDENTITY_HOME
to override (useful for tests and for running multiple independent stores).
Directory layout:
$LOCAL_IDENTITY_HOME/
├── config.yaml operator email and optional overrides (mode 600)
└── users/
├── <user>.yaml primary user (mode 600)
├── <user>1.yaml test user 1 (mode 600)
└── <user>2.yaml test user 2 (mode 600)
"""
import os
from pathlib import Path
from typing import List
import yaml
from .user import UserRecord
def _store_dir() -> Path:
"""Return the store root directory. Overridable via LOCAL_IDENTITY_HOME."""
return Path(os.environ.get("LOCAL_IDENTITY_HOME", str(Path.home() / ".local-identity")))
def _users_dir() -> Path:
return _store_dir() / "users"
def _config_file() -> Path:
return _store_dir() / "config.yaml"
# ------------------------------------------------------------------ #
# Directory management #
# ------------------------------------------------------------------ #
def store_exists() -> bool:
return _store_dir().exists()
def init_dirs() -> None:
"""Create the store directory tree with secure permissions."""
store = _store_dir()
users = _users_dir()
store.mkdir(mode=0o700, exist_ok=True)
users.mkdir(mode=0o700, exist_ok=True)
# Enforce permissions even when dirs already existed
os.chmod(store, 0o700)
os.chmod(users, 0o700)
# ------------------------------------------------------------------ #
# User CRUD #
# ------------------------------------------------------------------ #
def write_user(user: UserRecord) -> None:
path = _users_dir() / f"{user.username}.yaml"
path.write_text(user.to_yaml(), encoding="utf-8")
os.chmod(path, 0o600)
def read_user(username: str) -> UserRecord:
path = _users_dir() / f"{username}.yaml"
if not path.exists():
raise FileNotFoundError(f"User '{username}' not found in store")
return UserRecord.from_yaml(path.read_text(encoding="utf-8"))
def list_users() -> List[UserRecord]:
users_dir = _users_dir()
if not users_dir.exists():
return []
return [
UserRecord.from_yaml(p.read_text(encoding="utf-8"))
for p in sorted(users_dir.glob("*.yaml"))
]
# ------------------------------------------------------------------ #
# Config #
# ------------------------------------------------------------------ #
def read_config() -> dict:
config_file = _config_file()
if not config_file.exists():
return {}
return yaml.safe_load(config_file.read_text(encoding="utf-8")) or {}
def write_config(config: dict) -> None:
config_file = _config_file()
config_file.write_text(
yaml.dump(config, default_flow_style=False, allow_unicode=True),
encoding="utf-8",
)
os.chmod(config_file, 0o600)

View File

@@ -0,0 +1,127 @@
"""
UserRecord dataclass — in-memory representation of a local-identity user.
Schema fields (all stored in YAML):
schema_version str — format version, currently "1"
username str — Linux username (primary) or derived username (test)
fullname str — display name
email str — contact email
environment str — always "local"; production connectors reject this value
generated bool — True for auto-generated test users
source_user str? — for generated users: the source username
production_identity — optional mapping to a production account
.username str
.realm str
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Optional
import yaml
@dataclass
class ProductionIdentity:
username: str
realm: str
@dataclass
class UserRecord:
username: str
fullname: str
email: str
schema_version: str = "1"
environment: str = "local"
generated: bool = False
source_user: Optional[str] = None
production_identity: Optional[ProductionIdentity] = None
# ------------------------------------------------------------------ #
# Serialisation #
# ------------------------------------------------------------------ #
def to_dict(self) -> dict:
d: dict = {
"schema_version": self.schema_version,
"username": self.username,
"fullname": self.fullname,
"email": self.email,
"environment": self.environment,
"generated": self.generated,
}
if self.source_user is not None:
d["source_user"] = self.source_user
if self.production_identity is not None:
d["production_identity"] = {
"username": self.production_identity.username,
"realm": self.production_identity.realm,
}
return d
def to_yaml(self) -> str:
return yaml.dump(
self.to_dict(),
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
)
@classmethod
def from_dict(cls, data: dict) -> UserRecord:
pi: Optional[ProductionIdentity] = None
if data.get("production_identity"):
pi = ProductionIdentity(**data["production_identity"])
return cls(
schema_version=data.get("schema_version", "1"),
username=data["username"],
fullname=data["fullname"],
email=data["email"],
environment=data.get("environment", "local"),
generated=data.get("generated", False),
source_user=data.get("source_user"),
production_identity=pi,
)
@classmethod
def from_yaml(cls, text: str) -> UserRecord:
return cls.from_dict(yaml.safe_load(text))
# ------------------------------------------------------------------ #
# Test-user derivation (pure function) #
# ------------------------------------------------------------------ #
def make_test_user(primary: UserRecord, n: int) -> UserRecord:
"""
Derive test user N from the primary user.
Derivation rules:
username → <username><n>
fullname → <fullname>+test<n>
email → <local>+test<n>@<domain> (or <email>+test<n> if no @)
"""
if n < 1:
raise ValueError(f"n must be >= 1, got {n}")
suffix = str(n)
test_username = primary.username + suffix
test_fullname = f"{primary.fullname}+test{suffix}"
if "@" in primary.email:
local, domain = primary.email.rsplit("@", 1)
test_email = f"{local}+test{suffix}@{domain}"
else:
test_email = f"{primary.email}+test{suffix}"
return UserRecord(
username=test_username,
fullname=test_fullname,
email=test_email,
environment="local",
generated=True,
source_user=primary.username,
)