generated from coulomb/repo-seed
Renames the package, distribution, CLI alias, Makefile targets, and working directory from issue-facade to issue-core, signalling its role as the authoritative task lifecycle manager for the Coulomb org (peer to activity-core, rules-core, project-core). Adds POST /issues/ ingestion endpoint for activity-core's IssueSink, under a new optional [api] extra. The endpoint is served by `issue serve`, authenticates via the ISSUE_CORE_API_KEY env var (Bearer or X-API-Key header), and routes the TaskSpec payload to the configured default backend with full traceability metadata embedded in sync_metadata. - T01: Python package issue_tracker -> issue_core, dir rename - T02: registered in state hub under custodian domain - T03: INTENT.md (what it is, what it isn't, how it fits) - T04: SCOPE.md (in/out-of-scope, integration boundaries) - T05: POST /issues/ via FastAPI + Uvicorn, 9 unit tests - T06: docs/nats-task-ingestion.md design stub Closes ISSC-WP-0001. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
359 lines
11 KiB
Python
Executable File
359 lines
11 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Multi-Agent Pipeline
|
|
|
|
Demonstrates a CI/CD-like pipeline with multiple specialized agents:
|
|
- Coder Agent: Implements features
|
|
- Reviewer Agent: Reviews code
|
|
- Tester Agent: Runs tests
|
|
- Deployer Agent: Deploys to staging
|
|
|
|
Each agent monitors for issues in their stage and advances them through the pipeline.
|
|
|
|
Usage:
|
|
export GITEA_URL=https://gitea.example.com
|
|
export GITEA_TOKEN=your-token
|
|
export GITEA_OWNER=your-org
|
|
export GITEA_REPO=your-repo
|
|
|
|
# Run all agents in parallel (in separate terminals)
|
|
python multi_agent_pipeline.py --agent=coder
|
|
python multi_agent_pipeline.py --agent=reviewer
|
|
python multi_agent_pipeline.py --agent=tester
|
|
python multi_agent_pipeline.py --agent=deployer
|
|
|
|
# Or run in round-robin mode (all agents in one process)
|
|
python multi_agent_pipeline.py --mode=roundrobin
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
from issue_core.backends.gitea import GiteaBackend
|
|
from issue_core.core.models import Issue, Label, User, Comment, IssueState
|
|
from issue_core.core.interfaces import IssueFilter
|
|
|
|
|
|
class BaseAgent:
|
|
"""Base class for pipeline agents."""
|
|
|
|
def __init__(self, agent_id: str, role: str):
|
|
self.agent_id = agent_id
|
|
self.role = role
|
|
self.backend = None
|
|
|
|
def connect(self):
|
|
"""Connect to backend from environment."""
|
|
base_url = os.environ['GITEA_URL']
|
|
token = os.environ['GITEA_TOKEN']
|
|
owner = os.environ['GITEA_OWNER']
|
|
repo = os.environ['GITEA_REPO']
|
|
|
|
self.backend = GiteaBackend()
|
|
self.backend.connect({
|
|
'base_url': base_url,
|
|
'token': token,
|
|
'owner': owner,
|
|
'repo': repo
|
|
})
|
|
print(f"✓ {self.role} connected")
|
|
|
|
def log(self, message: str):
|
|
"""Log a message with agent role prefix."""
|
|
print(f"[{self.role}] {message}")
|
|
|
|
def add_comment(self, issue: Issue, message: str):
|
|
"""Add a comment to an issue."""
|
|
comment = Comment(
|
|
id=None,
|
|
body=f"**{self.role}**: {message}",
|
|
author=User(id=self.agent_id, username=self.agent_id),
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
self.backend.add_comment(issue.id, comment)
|
|
|
|
def process_one(self) -> bool:
|
|
"""Process one issue. Return True if work was done."""
|
|
raise NotImplementedError
|
|
|
|
|
|
class CoderAgent(BaseAgent):
|
|
"""Agent that implements features."""
|
|
|
|
def __init__(self):
|
|
super().__init__("agent-coder", "Coder")
|
|
|
|
def process_one(self) -> bool:
|
|
# Find issues needing implementation
|
|
issues = self.backend.list_issues(IssueFilter(
|
|
state='open',
|
|
labels=['needs-implementation']
|
|
))
|
|
|
|
if not issues:
|
|
return False
|
|
|
|
issue = issues[0]
|
|
self.log(f"Implementing #{issue.number}: {issue.title}")
|
|
|
|
# Update state
|
|
issue.state = IssueState.IN_PROGRESS
|
|
if not issue.assignees:
|
|
issue.assignees = [User(id=self.agent_id, username=self.agent_id)]
|
|
|
|
# Remove old label, add new label
|
|
issue.labels = [l for l in issue.labels if l.name != 'needs-implementation']
|
|
issue.labels.append(Label(name='needs-review'))
|
|
|
|
self.backend.update_issue(issue)
|
|
|
|
# Simulate work
|
|
time.sleep(2)
|
|
|
|
# Add comment
|
|
self.add_comment(issue,
|
|
"Implementation complete.\n\n"
|
|
"**Files changed:**\n"
|
|
"- src/feature.py\n"
|
|
"- tests/test_feature.py\n\n"
|
|
"Ready for code review."
|
|
)
|
|
|
|
self.log(f"✓ Completed #{issue.number}")
|
|
return True
|
|
|
|
|
|
class ReviewerAgent(BaseAgent):
|
|
"""Agent that reviews code."""
|
|
|
|
def __init__(self):
|
|
super().__init__("agent-reviewer", "Reviewer")
|
|
|
|
def process_one(self) -> bool:
|
|
# Find issues needing review
|
|
issues = self.backend.list_issues(IssueFilter(
|
|
state='in_progress',
|
|
labels=['needs-review']
|
|
))
|
|
|
|
if not issues:
|
|
return False
|
|
|
|
issue = issues[0]
|
|
self.log(f"Reviewing #{issue.number}: {issue.title}")
|
|
|
|
# Simulate review
|
|
time.sleep(2)
|
|
|
|
# Update labels
|
|
issue.labels = [l for l in issue.labels if l.name != 'needs-review']
|
|
issue.labels.append(Label(name='needs-testing'))
|
|
self.backend.update_issue(issue)
|
|
|
|
# Add review comment
|
|
self.add_comment(issue,
|
|
"Code review complete. ✅\n\n"
|
|
"**Review notes:**\n"
|
|
"- Code quality: Good\n"
|
|
"- Test coverage: 95%\n"
|
|
"- Documentation: Complete\n\n"
|
|
"Approved for testing."
|
|
)
|
|
|
|
self.log(f"✓ Approved #{issue.number}")
|
|
return True
|
|
|
|
|
|
class TesterAgent(BaseAgent):
|
|
"""Agent that runs tests."""
|
|
|
|
def __init__(self):
|
|
super().__init__("agent-tester", "Tester")
|
|
|
|
def process_one(self) -> bool:
|
|
# Find issues needing testing
|
|
issues = self.backend.list_issues(IssueFilter(
|
|
state='in_progress',
|
|
labels=['needs-testing']
|
|
))
|
|
|
|
if not issues:
|
|
return False
|
|
|
|
issue = issues[0]
|
|
self.log(f"Testing #{issue.number}: {issue.title}")
|
|
|
|
# Simulate tests
|
|
time.sleep(2)
|
|
|
|
# Update labels
|
|
issue.labels = [l for l in issue.labels if l.name != 'needs-testing']
|
|
issue.labels.append(Label(name='needs-deployment'))
|
|
self.backend.update_issue(issue)
|
|
|
|
# Add test results
|
|
self.add_comment(issue,
|
|
"All tests passing. ✅\n\n"
|
|
"**Test results:**\n"
|
|
"- Unit tests: 50/50 passed\n"
|
|
"- Integration tests: 12/12 passed\n"
|
|
"- Coverage: 96.5%\n\n"
|
|
"Ready for deployment."
|
|
)
|
|
|
|
self.log(f"✓ Tests passed #{issue.number}")
|
|
return True
|
|
|
|
|
|
class DeployerAgent(BaseAgent):
|
|
"""Agent that deploys to staging."""
|
|
|
|
def __init__(self):
|
|
super().__init__("agent-deployer", "Deployer")
|
|
|
|
def process_one(self) -> bool:
|
|
# Find issues needing deployment
|
|
issues = self.backend.list_issues(IssueFilter(
|
|
state='in_progress',
|
|
labels=['needs-deployment']
|
|
))
|
|
|
|
if not issues:
|
|
return False
|
|
|
|
issue = issues[0]
|
|
self.log(f"Deploying #{issue.number}: {issue.title}")
|
|
|
|
# Simulate deployment
|
|
time.sleep(2)
|
|
|
|
# Close issue
|
|
issue.state = IssueState.CLOSED
|
|
issue.closed_at = datetime.now(timezone.utc)
|
|
issue.labels = [l for l in issue.labels if l.name != 'needs-deployment']
|
|
issue.labels.append(Label(name='deployed'))
|
|
|
|
self.backend.update_issue(issue)
|
|
|
|
# Add deployment comment
|
|
self.add_comment(issue,
|
|
"Deployed to staging. 🚀\n\n"
|
|
"**Deployment info:**\n"
|
|
"- Environment: staging\n"
|
|
"- Version: v1.2.3\n"
|
|
"- Status: Healthy\n\n"
|
|
"Feature is live and ready for user testing."
|
|
)
|
|
|
|
self.log(f"✓ Deployed #{issue.number}")
|
|
return True
|
|
|
|
|
|
def run_single_agent(agent: BaseAgent, poll_interval: int = 5):
|
|
"""Run a single agent in a loop."""
|
|
agent.connect()
|
|
agent.log("Starting pipeline agent...")
|
|
agent.log("Press Ctrl+C to stop\n")
|
|
|
|
while True:
|
|
try:
|
|
if agent.process_one():
|
|
agent.log("Task completed, checking for more work...")
|
|
else:
|
|
agent.log(f"No work available, waiting {poll_interval}s...")
|
|
time.sleep(poll_interval)
|
|
except KeyboardInterrupt:
|
|
agent.log("Shutting down...")
|
|
break
|
|
except Exception as e:
|
|
agent.log(f"Error: {e}")
|
|
time.sleep(poll_interval)
|
|
|
|
|
|
def run_roundrobin(agents: List[BaseAgent], poll_interval: int = 5):
|
|
"""Run all agents in round-robin fashion."""
|
|
print("🤖 Starting multi-agent pipeline (round-robin mode)")
|
|
print(" Agents:", ", ".join([a.role for a in agents]))
|
|
print(" Press Ctrl+C to stop\n")
|
|
|
|
# Connect all agents
|
|
for agent in agents:
|
|
agent.connect()
|
|
|
|
while True:
|
|
try:
|
|
work_done = False
|
|
for agent in agents:
|
|
if agent.process_one():
|
|
work_done = True
|
|
time.sleep(1) # Brief pause between agents
|
|
|
|
if not work_done:
|
|
print(f"⏸️ No work available for any agent, waiting {poll_interval}s...")
|
|
time.sleep(poll_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Shutting down all agents...")
|
|
break
|
|
except Exception as e:
|
|
print(f"❌ Error: {e}")
|
|
time.sleep(poll_interval)
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(description='Multi-agent pipeline')
|
|
parser.add_argument('--agent', choices=['coder', 'reviewer', 'tester', 'deployer'],
|
|
help='Run specific agent')
|
|
parser.add_argument('--mode', choices=['single', 'roundrobin'], default='single',
|
|
help='Execution mode')
|
|
parser.add_argument('--poll-interval', type=int, default=5,
|
|
help='Polling interval in seconds')
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
if args.mode == 'roundrobin':
|
|
agents = [
|
|
CoderAgent(),
|
|
ReviewerAgent(),
|
|
TesterAgent(),
|
|
DeployerAgent()
|
|
]
|
|
run_roundrobin(agents, args.poll_interval)
|
|
else:
|
|
if not args.agent:
|
|
print("Error: --agent required in single mode")
|
|
print("Use --mode=roundrobin to run all agents")
|
|
sys.exit(1)
|
|
|
|
agent_map = {
|
|
'coder': CoderAgent(),
|
|
'reviewer': ReviewerAgent(),
|
|
'tester': TesterAgent(),
|
|
'deployer': DeployerAgent()
|
|
}
|
|
|
|
agent = agent_map[args.agent]
|
|
run_single_agent(agent, args.poll_interval)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|