Files
issue-core/examples/agents/multi_agent_pipeline.py
tegwick b605d970e3 feat: rename to issue-core and add task ingestion endpoint
Renames the package, distribution, CLI alias, Makefile targets, and
working directory from issue-facade to issue-core, signalling its
role as the authoritative task lifecycle manager for the Coulomb org
(peer to activity-core, rules-core, project-core).

Adds POST /issues/ ingestion endpoint for activity-core's IssueSink,
under a new optional [api] extra. The endpoint is served by `issue
serve`, authenticates via the ISSUE_CORE_API_KEY env var (Bearer or
X-API-Key header), and routes the TaskSpec payload to the configured
default backend with full traceability metadata embedded in
sync_metadata.

- T01: Python package issue_tracker -> issue_core, dir rename
- T02: registered in state hub under custodian domain
- T03: INTENT.md (what it is, what it isn't, how it fits)
- T04: SCOPE.md (in/out-of-scope, integration boundaries)
- T05: POST /issues/ via FastAPI + Uvicorn, 9 unit tests
- T06: docs/nats-task-ingestion.md design stub

Closes ISSC-WP-0001.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 05:16:27 +02:00

359 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Multi-Agent Pipeline
Demonstrates a CI/CD-like pipeline with multiple specialized agents:
- Coder Agent: Implements features
- Reviewer Agent: Reviews code
- Tester Agent: Runs tests
- Deployer Agent: Deploys to staging
Each agent monitors for issues in their stage and advances them through the pipeline.
Usage:
export GITEA_URL=https://gitea.example.com
export GITEA_TOKEN=your-token
export GITEA_OWNER=your-org
export GITEA_REPO=your-repo
# Run all agents in parallel (in separate terminals)
python multi_agent_pipeline.py --agent=coder
python multi_agent_pipeline.py --agent=reviewer
python multi_agent_pipeline.py --agent=tester
python multi_agent_pipeline.py --agent=deployer
# Or run in round-robin mode (all agents in one process)
python multi_agent_pipeline.py --mode=roundrobin
"""
import os
import sys
import time
import argparse
from datetime import datetime, timezone
from pathlib import Path
from typing import List
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from issue_core.backends.gitea import GiteaBackend
from issue_core.core.models import Issue, Label, User, Comment, IssueState
from issue_core.core.interfaces import IssueFilter
class BaseAgent:
"""Base class for pipeline agents."""
def __init__(self, agent_id: str, role: str):
self.agent_id = agent_id
self.role = role
self.backend = None
def connect(self):
"""Connect to backend from environment."""
base_url = os.environ['GITEA_URL']
token = os.environ['GITEA_TOKEN']
owner = os.environ['GITEA_OWNER']
repo = os.environ['GITEA_REPO']
self.backend = GiteaBackend()
self.backend.connect({
'base_url': base_url,
'token': token,
'owner': owner,
'repo': repo
})
print(f"{self.role} connected")
def log(self, message: str):
"""Log a message with agent role prefix."""
print(f"[{self.role}] {message}")
def add_comment(self, issue: Issue, message: str):
"""Add a comment to an issue."""
comment = Comment(
id=None,
body=f"**{self.role}**: {message}",
author=User(id=self.agent_id, username=self.agent_id),
created_at=datetime.now(timezone.utc)
)
self.backend.add_comment(issue.id, comment)
def process_one(self) -> bool:
"""Process one issue. Return True if work was done."""
raise NotImplementedError
class CoderAgent(BaseAgent):
"""Agent that implements features."""
def __init__(self):
super().__init__("agent-coder", "Coder")
def process_one(self) -> bool:
# Find issues needing implementation
issues = self.backend.list_issues(IssueFilter(
state='open',
labels=['needs-implementation']
))
if not issues:
return False
issue = issues[0]
self.log(f"Implementing #{issue.number}: {issue.title}")
# Update state
issue.state = IssueState.IN_PROGRESS
if not issue.assignees:
issue.assignees = [User(id=self.agent_id, username=self.agent_id)]
# Remove old label, add new label
issue.labels = [l for l in issue.labels if l.name != 'needs-implementation']
issue.labels.append(Label(name='needs-review'))
self.backend.update_issue(issue)
# Simulate work
time.sleep(2)
# Add comment
self.add_comment(issue,
"Implementation complete.\n\n"
"**Files changed:**\n"
"- src/feature.py\n"
"- tests/test_feature.py\n\n"
"Ready for code review."
)
self.log(f"✓ Completed #{issue.number}")
return True
class ReviewerAgent(BaseAgent):
"""Agent that reviews code."""
def __init__(self):
super().__init__("agent-reviewer", "Reviewer")
def process_one(self) -> bool:
# Find issues needing review
issues = self.backend.list_issues(IssueFilter(
state='in_progress',
labels=['needs-review']
))
if not issues:
return False
issue = issues[0]
self.log(f"Reviewing #{issue.number}: {issue.title}")
# Simulate review
time.sleep(2)
# Update labels
issue.labels = [l for l in issue.labels if l.name != 'needs-review']
issue.labels.append(Label(name='needs-testing'))
self.backend.update_issue(issue)
# Add review comment
self.add_comment(issue,
"Code review complete. ✅\n\n"
"**Review notes:**\n"
"- Code quality: Good\n"
"- Test coverage: 95%\n"
"- Documentation: Complete\n\n"
"Approved for testing."
)
self.log(f"✓ Approved #{issue.number}")
return True
class TesterAgent(BaseAgent):
"""Agent that runs tests."""
def __init__(self):
super().__init__("agent-tester", "Tester")
def process_one(self) -> bool:
# Find issues needing testing
issues = self.backend.list_issues(IssueFilter(
state='in_progress',
labels=['needs-testing']
))
if not issues:
return False
issue = issues[0]
self.log(f"Testing #{issue.number}: {issue.title}")
# Simulate tests
time.sleep(2)
# Update labels
issue.labels = [l for l in issue.labels if l.name != 'needs-testing']
issue.labels.append(Label(name='needs-deployment'))
self.backend.update_issue(issue)
# Add test results
self.add_comment(issue,
"All tests passing. ✅\n\n"
"**Test results:**\n"
"- Unit tests: 50/50 passed\n"
"- Integration tests: 12/12 passed\n"
"- Coverage: 96.5%\n\n"
"Ready for deployment."
)
self.log(f"✓ Tests passed #{issue.number}")
return True
class DeployerAgent(BaseAgent):
"""Agent that deploys to staging."""
def __init__(self):
super().__init__("agent-deployer", "Deployer")
def process_one(self) -> bool:
# Find issues needing deployment
issues = self.backend.list_issues(IssueFilter(
state='in_progress',
labels=['needs-deployment']
))
if not issues:
return False
issue = issues[0]
self.log(f"Deploying #{issue.number}: {issue.title}")
# Simulate deployment
time.sleep(2)
# Close issue
issue.state = IssueState.CLOSED
issue.closed_at = datetime.now(timezone.utc)
issue.labels = [l for l in issue.labels if l.name != 'needs-deployment']
issue.labels.append(Label(name='deployed'))
self.backend.update_issue(issue)
# Add deployment comment
self.add_comment(issue,
"Deployed to staging. 🚀\n\n"
"**Deployment info:**\n"
"- Environment: staging\n"
"- Version: v1.2.3\n"
"- Status: Healthy\n\n"
"Feature is live and ready for user testing."
)
self.log(f"✓ Deployed #{issue.number}")
return True
def run_single_agent(agent: BaseAgent, poll_interval: int = 5):
"""Run a single agent in a loop."""
agent.connect()
agent.log("Starting pipeline agent...")
agent.log("Press Ctrl+C to stop\n")
while True:
try:
if agent.process_one():
agent.log("Task completed, checking for more work...")
else:
agent.log(f"No work available, waiting {poll_interval}s...")
time.sleep(poll_interval)
except KeyboardInterrupt:
agent.log("Shutting down...")
break
except Exception as e:
agent.log(f"Error: {e}")
time.sleep(poll_interval)
def run_roundrobin(agents: List[BaseAgent], poll_interval: int = 5):
"""Run all agents in round-robin fashion."""
print("🤖 Starting multi-agent pipeline (round-robin mode)")
print(" Agents:", ", ".join([a.role for a in agents]))
print(" Press Ctrl+C to stop\n")
# Connect all agents
for agent in agents:
agent.connect()
while True:
try:
work_done = False
for agent in agents:
if agent.process_one():
work_done = True
time.sleep(1) # Brief pause between agents
if not work_done:
print(f"⏸️ No work available for any agent, waiting {poll_interval}s...")
time.sleep(poll_interval)
except KeyboardInterrupt:
print("\n\n⚠️ Shutting down all agents...")
break
except Exception as e:
print(f"❌ Error: {e}")
time.sleep(poll_interval)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='Multi-agent pipeline')
parser.add_argument('--agent', choices=['coder', 'reviewer', 'tester', 'deployer'],
help='Run specific agent')
parser.add_argument('--mode', choices=['single', 'roundrobin'], default='single',
help='Execution mode')
parser.add_argument('--poll-interval', type=int, default=5,
help='Polling interval in seconds')
args = parser.parse_args()
try:
if args.mode == 'roundrobin':
agents = [
CoderAgent(),
ReviewerAgent(),
TesterAgent(),
DeployerAgent()
]
run_roundrobin(agents, args.poll_interval)
else:
if not args.agent:
print("Error: --agent required in single mode")
print("Use --mode=roundrobin to run all agents")
sys.exit(1)
agent_map = {
'coder': CoderAgent(),
'reviewer': ReviewerAgent(),
'tester': TesterAgent(),
'deployer': DeployerAgent()
}
agent = agent_map[args.agent]
run_single_agent(agent, args.poll_interval)
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()