generated from coulomb/repo-seed
Renames the package, distribution, CLI alias, Makefile targets, and working directory from issue-facade to issue-core, signalling its role as the authoritative task lifecycle manager for the Coulomb org (peer to activity-core, rules-core, project-core). Adds POST /issues/ ingestion endpoint for activity-core's IssueSink, under a new optional [api] extra. The endpoint is served by `issue serve`, authenticates via the ISSUE_CORE_API_KEY env var (Bearer or X-API-Key header), and routes the TaskSpec payload to the configured default backend with full traceability metadata embedded in sync_metadata. - T01: Python package issue_tracker -> issue_core, dir rename - T02: registered in state hub under custodian domain - T03: INTENT.md (what it is, what it isn't, how it fits) - T04: SCOPE.md (in/out-of-scope, integration boundaries) - T05: POST /issues/ via FastAPI + Uvicorn, 9 unit tests - T06: docs/nats-task-ingestion.md design stub Closes ISSC-WP-0001. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
214 lines
6.9 KiB
Python
Executable File
214 lines
6.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Simple Task Executor Agent
|
|
|
|
This agent demonstrates a basic workflow:
|
|
1. Query for available tasks (open issues with specific labels)
|
|
2. Claim a task by assigning it to itself
|
|
3. Execute the task (simulated)
|
|
4. Report completion and close the issue
|
|
|
|
Usage:
|
|
export GITEA_URL=https://gitea.example.com
|
|
export GITEA_TOKEN=your-token
|
|
export GITEA_OWNER=your-org
|
|
export GITEA_REPO=your-repo
|
|
|
|
python simple_task_executor.py
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# Add parent directory to path for imports
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
from issue_core.backends.gitea import GiteaBackend
|
|
from issue_core.core.models import Issue, Label, User, Comment, IssueState
|
|
from issue_core.core.interfaces import IssueFilter
|
|
|
|
|
|
class SimpleTaskExecutor:
|
|
"""A simple agent that claims and executes tasks from issue tracker."""
|
|
|
|
def __init__(self, agent_id: str = "agent-executor"):
|
|
self.agent_id = agent_id
|
|
self.backend = None
|
|
|
|
def connect(self):
|
|
"""Connect to Gitea backend from environment variables."""
|
|
base_url = os.environ.get('GITEA_URL')
|
|
token = os.environ.get('GITEA_TOKEN')
|
|
owner = os.environ.get('GITEA_OWNER')
|
|
repo = os.environ.get('GITEA_REPO')
|
|
|
|
if not all([base_url, token, owner, repo]):
|
|
raise ValueError(
|
|
"Missing required environment variables: "
|
|
"GITEA_URL, GITEA_TOKEN, GITEA_OWNER, GITEA_REPO"
|
|
)
|
|
|
|
self.backend = GiteaBackend()
|
|
self.backend.connect({
|
|
'base_url': base_url,
|
|
'token': token,
|
|
'owner': owner,
|
|
'repo': repo
|
|
})
|
|
print(f"✓ Connected to {base_url}/{owner}/{repo}")
|
|
|
|
def find_available_task(self):
|
|
"""Find an open task that's ready to be worked on."""
|
|
print("\n🔍 Searching for available tasks...")
|
|
|
|
# Look for issues labeled as ready and not assigned
|
|
issues = self.backend.list_issues(IssueFilter(
|
|
state='open',
|
|
labels=['ready'],
|
|
limit=10
|
|
))
|
|
|
|
# Filter to unassigned issues
|
|
available = [issue for issue in issues if not issue.assignees]
|
|
|
|
if available:
|
|
print(f" Found {len(available)} available task(s)")
|
|
return available[0]
|
|
else:
|
|
print(" No available tasks found")
|
|
return None
|
|
|
|
def claim_task(self, issue: Issue):
|
|
"""Claim a task by assigning it to this agent."""
|
|
print(f"\n📌 Claiming issue #{issue.number}: {issue.title}")
|
|
|
|
# Update issue state and assignee
|
|
issue.state = IssueState.IN_PROGRESS
|
|
issue.assignees = [User(id=self.agent_id, username=self.agent_id)]
|
|
self.backend.update_issue(issue)
|
|
|
|
# Add comment announcing claim
|
|
comment = Comment(
|
|
id=None,
|
|
body=f"🤖 Task claimed by {self.agent_id}\n\nStarting work...",
|
|
author=User(id=self.agent_id, username=self.agent_id),
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
self.backend.add_comment(issue.id, comment)
|
|
print(f" ✓ Issue #{issue.number} claimed")
|
|
|
|
def execute_task(self, issue: Issue):
|
|
"""Execute the task (simulated with sleep)."""
|
|
print(f"\n⚙️ Executing task #{issue.number}...")
|
|
|
|
# Simulate work by sleeping
|
|
work_time = 2
|
|
for i in range(work_time):
|
|
time.sleep(1)
|
|
print(f" Working... ({i+1}/{work_time}s)")
|
|
|
|
# Add progress comment
|
|
comment = Comment(
|
|
id=None,
|
|
body=f"Implementation complete.\n\n"
|
|
f"**Changes made:**\n"
|
|
f"- Analyzed requirements\n"
|
|
f"- Implemented solution\n"
|
|
f"- Verified functionality\n\n"
|
|
f"Ready for review.",
|
|
author=User(id=self.agent_id, username=self.agent_id),
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
self.backend.add_comment(issue.id, comment)
|
|
print(f" ✓ Task #{issue.number} completed")
|
|
|
|
def complete_task(self, issue: Issue):
|
|
"""Mark task as complete by closing the issue."""
|
|
print(f"\n✅ Completing issue #{issue.number}...")
|
|
|
|
# Close the issue
|
|
issue.state = IssueState.CLOSED
|
|
issue.closed_at = datetime.now(timezone.utc)
|
|
self.backend.update_issue(issue)
|
|
|
|
# Add completion comment
|
|
comment = Comment(
|
|
id=None,
|
|
body=f"🎉 Task completed by {self.agent_id}\n\n"
|
|
f"All work has been finished and verified.",
|
|
author=User(id=self.agent_id, username=self.agent_id),
|
|
created_at=datetime.now(timezone.utc)
|
|
)
|
|
self.backend.add_comment(issue.id, comment)
|
|
print(f" ✓ Issue #{issue.number} closed")
|
|
|
|
def run_once(self):
|
|
"""Execute one task cycle."""
|
|
# Find available task
|
|
task = self.find_available_task()
|
|
if not task:
|
|
return False
|
|
|
|
# Claim and execute
|
|
self.claim_task(task)
|
|
self.execute_task(task)
|
|
self.complete_task(task)
|
|
|
|
return True
|
|
|
|
def run_loop(self, max_iterations=None):
|
|
"""Run continuously, processing tasks as they become available."""
|
|
print(f"\n🤖 {self.agent_id} starting task execution loop...")
|
|
print(f" Press Ctrl+C to stop\n")
|
|
|
|
iteration = 0
|
|
while True:
|
|
iteration += 1
|
|
|
|
if max_iterations and iteration > max_iterations:
|
|
print(f"\n⚠️ Reached maximum iterations ({max_iterations})")
|
|
break
|
|
|
|
# Try to process one task
|
|
processed = self.run_once()
|
|
|
|
if processed:
|
|
print(f"\n✓ Completed iteration {iteration}")
|
|
else:
|
|
print(f"\n⏸️ No tasks available (iteration {iteration})")
|
|
print(" Waiting 5 seconds before checking again...")
|
|
time.sleep(5)
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
try:
|
|
# Create and initialize agent
|
|
agent = SimpleTaskExecutor(agent_id="agent-executor")
|
|
agent.connect()
|
|
|
|
# Run once or in loop
|
|
import sys
|
|
if '--once' in sys.argv:
|
|
agent.run_once()
|
|
elif '--max-iterations' in sys.argv:
|
|
idx = sys.argv.index('--max-iterations')
|
|
max_iter = int(sys.argv[idx + 1])
|
|
agent.run_loop(max_iterations=max_iter)
|
|
else:
|
|
agent.run_loop()
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠️ Interrupted by user")
|
|
sys.exit(0)
|
|
except Exception as e:
|
|
print(f"\n❌ Error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|