#!/usr/bin/env python3 """ Human-in-the-Loop Agent This agent demonstrates agent-human collaboration: 1. Agent proposes implementations 2. Human reviews and approves via comments 3. Agent proceeds only after approval 4. Agent can ask questions and wait for answers Usage: export GITEA_URL=https://gitea.example.com export GITEA_TOKEN=your-token export GITEA_OWNER=your-org export GITEA_REPO=your-repo python human_in_loop.py """ import os import sys import time import re from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional, List sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from issue_core.backends.gitea import GiteaBackend from issue_core.core.models import Issue, Label, User, Comment, IssueState from issue_core.core.interfaces import IssueFilter class HumanInLoopAgent: """Agent that requires human approval for critical decisions.""" def __init__(self, agent_id: str = "agent-collaborative"): self.agent_id = agent_id self.backend = None def connect(self): """Connect to backend.""" base_url = os.environ['GITEA_URL'] token = os.environ['GITEA_TOKEN'] owner = os.environ['GITEA_OWNER'] repo = os.environ['GITEA_REPO'] self.backend = GiteaBackend() self.backend.connect({ 'base_url': base_url, 'token': token, 'owner': owner, 'repo': repo }) print(f"✓ Connected to {base_url}/{owner}/{repo}") def find_feature_request(self) -> Optional[Issue]: """Find a feature request that needs breaking down.""" print("\n🔍 Looking for feature requests...") issues = self.backend.list_issues(IssueFilter( state='open', labels=['feature', 'needs-breakdown'] )) if issues: print(f" Found {len(issues)} feature request(s)") return issues[0] else: print(" No feature requests found") return None def propose_breakdown(self, feature: Issue) -> List[str]: """Propose breaking down a feature into subtasks.""" print(f"\n💡 Analyzing feature #{feature.number}: {feature.title}") # Simulate AI analysis time.sleep(2) # Generate proposed subtasks subtasks = [ f"Design database schema for {feature.title}", f"Implement backend API for {feature.title}", f"Create frontend components for {feature.title}", f"Write integration tests for {feature.title}", f"Update documentation for {feature.title}" ] print(f" Proposed {len(subtasks)} subtasks") return subtasks def request_approval(self, issue: Issue, subtasks: List[str]): """Post proposal and request human approval.""" print(f"\n📝 Requesting approval for feature breakdown...") # Format proposal proposal = f"🤖 **Agent Proposal: Feature Breakdown**\n\n" proposal += f"I analyzed this feature and propose breaking it into {len(subtasks)} subtasks:\n\n" for i, task in enumerate(subtasks, 1): proposal += f"{i}. {task}\n" proposal += f"\n**Human Review Required:**\n" proposal += f"- Reply with `APPROVED` to create these subtasks\n" proposal += f"- Reply with `REJECTED: reason` to decline\n" proposal += f"- Reply with `MODIFY: suggestions` to request changes\n\n" proposal += f"*Waiting for your response...*" # Post comment comment = Comment( id=None, body=proposal, author=User(id=self.agent_id, username=self.agent_id), created_at=datetime.now(timezone.utc) ) self.backend.add_comment(issue.id, comment) # Add label issue.labels.append(Label(name='awaiting-approval')) self.backend.update_issue(issue) print(f" ✓ Approval requested on issue #{issue.number}") def check_for_approval(self, issue: Issue, since: datetime) -> Optional[str]: """Check if human has approved/rejected.""" comments = self.backend.get_comments(issue.id) # Look for recent human comments for comment in reversed(comments): # Skip agent's own comments if comment.author.username == self.agent_id: continue # Only check comments after proposal if comment.created_at <= since: continue # Check for approval keywords body_upper = comment.body.upper() if 'APPROVED' in body_upper: return 'approved' elif 'REJECTED' in body_upper: return 'rejected' elif 'MODIFY' in body_upper: return 'modify' return None def create_subtasks(self, parent: Issue, subtasks: List[str]): """Create subtask issues.""" print(f"\n✅ Creating {len(subtasks)} subtasks...") for i, task_title in enumerate(subtasks, 1): subtask = Issue( id=None, number=0, title=task_title, description=f"**Subtask of #{parent.number}**: {parent.title}\n\n" f"This is subtask {i} of {len(subtasks)}.\n\n" f"**Parent Issue:** #{parent.number}", state=IssueState.OPEN, created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), labels=[ Label(name='subtask'), Label(name=f'parent:{parent.number}'), Label(name='agent-generated') ] ) created = self.backend.create_issue(subtask) print(f" Created subtask #{created.number}: {task_title}") time.sleep(0.5) # Rate limit # Update parent issue parent.labels = [l for l in parent.labels if l.name not in ['needs-breakdown', 'awaiting-approval']] parent.labels.append(Label(name='broken-down')) self.backend.update_issue(parent) # Add completion comment comment = Comment( id=None, body=f"✅ Subtasks created successfully.\n\n" f"Created {len(subtasks)} subtasks. " f"You can now assign these to team members or agents.", author=User(id=self.agent_id, username=self.agent_id), created_at=datetime.now(timezone.utc) ) self.backend.add_comment(parent.id, comment) print(f" ✓ All subtasks created for #{parent.number}") def wait_for_approval(self, issue: Issue, proposal_time: datetime, timeout_minutes: int = 60): """Wait for human approval with timeout.""" print(f"\n⏳ Waiting for human approval (timeout: {timeout_minutes}min)...") start_time = datetime.now(timezone.utc) timeout = timedelta(minutes=timeout_minutes) check_interval = 30 # Check every 30 seconds while True: elapsed = datetime.now(timezone.utc) - start_time if elapsed > timeout: print(f"\n⏱️ Timeout: No response after {timeout_minutes} minutes") return None # Check for approval decision = self.check_for_approval(issue, proposal_time) if decision: print(f"\n✓ Human response: {decision.upper()}") return decision # Wait before checking again remaining = int((timeout - elapsed).total_seconds() / 60) print(f" Still waiting... ({remaining} minutes remaining)") time.sleep(check_interval) def run_cycle(self): """Run one complete cycle.""" # Find feature to break down feature = self.find_feature_request() if not feature: return False # Propose breakdown subtasks = self.propose_breakdown(feature) # Request approval proposal_time = datetime.now(timezone.utc) self.request_approval(feature, subtasks) # Wait for human decision decision = self.wait_for_approval(feature, proposal_time, timeout_minutes=60) if decision == 'approved': print("\n🎉 Proposal approved! Creating subtasks...") self.create_subtasks(feature, subtasks) return True elif decision == 'rejected': print("\n❌ Proposal rejected. Moving to next feature.") feature.labels = [l for l in feature.labels if l.name != 'awaiting-approval'] feature.labels.append(Label(name='proposal-rejected')) self.backend.update_issue(feature) return False elif decision == 'modify': print("\n🔄 Modification requested. Human will update requirements.") feature.labels = [l for l in feature.labels if l.name != 'awaiting-approval'] feature.labels.append(Label(name='needs-revision')) self.backend.update_issue(feature) return False else: print("\n⏱️ No response. Will retry later.") feature.labels = [l for l in feature.labels if l.name != 'awaiting-approval'] self.backend.update_issue(feature) return False def main(): """Main entry point.""" try: agent = HumanInLoopAgent(agent_id="agent-collaborative") agent.connect() print("\n🤖 Human-in-the-Loop Agent") print(" This agent proposes feature breakdowns and waits for approval\n") if '--once' in sys.argv: agent.run_cycle() else: # Run continuously print(" Running in loop mode (Ctrl+C to stop)\n") while True: if not agent.run_cycle(): print("\n⏸️ No work to do, waiting 60 seconds...") time.sleep(60) except KeyboardInterrupt: print("\n\n⚠️ Interrupted by user") sys.exit(0) except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': main()