#!/usr/bin/env python3 """ Simple Task Executor Agent This agent demonstrates a basic workflow: 1. Query for available tasks (open issues with specific labels) 2. Claim a task by assigning it to itself 3. Execute the task (simulated) 4. Report completion and close the issue Usage: export GITEA_URL=https://gitea.example.com export GITEA_TOKEN=your-token export GITEA_OWNER=your-org export GITEA_REPO=your-repo python simple_task_executor.py """ import os import sys import time from datetime import datetime, timezone from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from issue_tracker.backends.gitea import GiteaBackend from issue_tracker.core.models import Issue, Label, User, Comment, IssueState from issue_tracker.core.interfaces import IssueFilter class SimpleTaskExecutor: """A simple agent that claims and executes tasks from issue tracker.""" def __init__(self, agent_id: str = "agent-executor"): self.agent_id = agent_id self.backend = None def connect(self): """Connect to Gitea backend from environment variables.""" base_url = os.environ.get('GITEA_URL') token = os.environ.get('GITEA_TOKEN') owner = os.environ.get('GITEA_OWNER') repo = os.environ.get('GITEA_REPO') if not all([base_url, token, owner, repo]): raise ValueError( "Missing required environment variables: " "GITEA_URL, GITEA_TOKEN, GITEA_OWNER, GITEA_REPO" ) self.backend = GiteaBackend() self.backend.connect({ 'base_url': base_url, 'token': token, 'owner': owner, 'repo': repo }) print(f"βœ“ Connected to {base_url}/{owner}/{repo}") def find_available_task(self): """Find an open task that's ready to be worked on.""" print("\nπŸ” Searching for available tasks...") # Look for issues labeled as ready and not assigned issues = self.backend.list_issues(IssueFilter( state='open', labels=['ready'], limit=10 )) # Filter to unassigned issues available = [issue for issue in issues if not issue.assignees] if available: print(f" Found {len(available)} available task(s)") return available[0] else: print(" No available tasks found") return None def claim_task(self, issue: Issue): """Claim a task by assigning it to this agent.""" print(f"\nπŸ“Œ Claiming issue #{issue.number}: {issue.title}") # Update issue state and assignee issue.state = IssueState.IN_PROGRESS issue.assignees = [User(id=self.agent_id, username=self.agent_id)] self.backend.update_issue(issue) # Add comment announcing claim comment = Comment( id=None, body=f"πŸ€– Task claimed by {self.agent_id}\n\nStarting work...", author=User(id=self.agent_id, username=self.agent_id), created_at=datetime.now(timezone.utc) ) self.backend.add_comment(issue.id, comment) print(f" βœ“ Issue #{issue.number} claimed") def execute_task(self, issue: Issue): """Execute the task (simulated with sleep).""" print(f"\nβš™οΈ Executing task #{issue.number}...") # Simulate work by sleeping work_time = 2 for i in range(work_time): time.sleep(1) print(f" Working... ({i+1}/{work_time}s)") # Add progress comment comment = Comment( id=None, body=f"Implementation complete.\n\n" f"**Changes made:**\n" f"- Analyzed requirements\n" f"- Implemented solution\n" f"- Verified functionality\n\n" f"Ready for review.", author=User(id=self.agent_id, username=self.agent_id), created_at=datetime.now(timezone.utc) ) self.backend.add_comment(issue.id, comment) print(f" βœ“ Task #{issue.number} completed") def complete_task(self, issue: Issue): """Mark task as complete by closing the issue.""" print(f"\nβœ… Completing issue #{issue.number}...") # Close the issue issue.state = IssueState.CLOSED issue.closed_at = datetime.now(timezone.utc) self.backend.update_issue(issue) # Add completion comment comment = Comment( id=None, body=f"πŸŽ‰ Task completed by {self.agent_id}\n\n" f"All work has been finished and verified.", author=User(id=self.agent_id, username=self.agent_id), created_at=datetime.now(timezone.utc) ) self.backend.add_comment(issue.id, comment) print(f" βœ“ Issue #{issue.number} closed") def run_once(self): """Execute one task cycle.""" # Find available task task = self.find_available_task() if not task: return False # Claim and execute self.claim_task(task) self.execute_task(task) self.complete_task(task) return True def run_loop(self, max_iterations=None): """Run continuously, processing tasks as they become available.""" print(f"\nπŸ€– {self.agent_id} starting task execution loop...") print(f" Press Ctrl+C to stop\n") iteration = 0 while True: iteration += 1 if max_iterations and iteration > max_iterations: print(f"\n⚠️ Reached maximum iterations ({max_iterations})") break # Try to process one task processed = self.run_once() if processed: print(f"\nβœ“ Completed iteration {iteration}") else: print(f"\n⏸️ No tasks available (iteration {iteration})") print(" Waiting 5 seconds before checking again...") time.sleep(5) def main(): """Main entry point.""" try: # Create and initialize agent agent = SimpleTaskExecutor(agent_id="agent-executor") agent.connect() # Run once or in loop import sys if '--once' in sys.argv: agent.run_once() elif '--max-iterations' in sys.argv: idx = sys.argv.index('--max-iterations') max_iter = int(sys.argv[idx + 1]) agent.run_loop(max_iterations=max_iter) else: agent.run_loop() except KeyboardInterrupt: print("\n\n⚠️ Interrupted by user") sys.exit(0) except Exception as e: print(f"\n❌ Error: {e}") sys.exit(1) if __name__ == '__main__': main()