#!/usr/bin/env python3 """ Monitoring Agent This agent monitors issue health and sends alerts: - Detects stale issues (not updated in N days) - Finds blocked issues waiting too long - Identifies high-priority issues without assignees - Reports on project velocity and bottlenecks Usage: export GITEA_URL=https://gitea.example.com export GITEA_TOKEN=your-token export GITEA_OWNER=your-org export GITEA_REPO=your-repo python monitoring_agent.py [--stale-days=7] [--check-interval=3600] """ import os import sys import time import argparse from datetime import datetime, timezone, timedelta from pathlib import Path from collections import defaultdict from typing import List, Dict sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from issue_tracker.backends.gitea import GiteaBackend from issue_tracker.core.models import Issue, Label, User, Comment, IssueState from issue_tracker.core.interfaces import IssueFilter class MonitoringAgent: """Agent that monitors issue tracker health and alerts on problems.""" def __init__(self, agent_id: str = "agent-monitor", stale_days: int = 7): self.agent_id = agent_id self.stale_threshold = timedelta(days=stale_days) self.backend = None def connect(self): """Connect to backend.""" base_url = os.environ['GITEA_URL'] token = os.environ['GITEA_TOKEN'] owner = os.environ['GITEA_OWNER'] repo = os.environ['GITEA_REPO'] self.backend = GiteaBackend() self.backend.connect({ 'base_url': base_url, 'token': token, 'owner': owner, 'repo': repo }) print(f"βœ“ Connected to {base_url}/{owner}/{repo}") def check_stale_issues(self) -> List[Issue]: """Find issues that haven't been updated recently.""" print("\nπŸ” Checking for stale issues...") cutoff = datetime.now(timezone.utc) - self.stale_threshold # Get all open issues all_issues = self.backend.list_issues(IssueFilter(state='open')) # Filter to stale ones stale = [issue for issue in all_issues if issue.updated_at < cutoff] if stale: print(f" ⚠️ Found {len(stale)} stale issue(s)") for issue in stale: days_stale = (datetime.now(timezone.utc) - issue.updated_at).days print(f" #{issue.number}: {issue.title} " f"(stale for {days_stale} days)") else: print(f" βœ“ No stale issues found") return stale def check_blocked_issues(self) -> List[Issue]: """Find blocked issues.""" print("\nπŸ” Checking for blocked issues...") blocked = self.backend.list_issues(IssueFilter( state='blocked' )) if blocked: print(f" ⚠️ Found {len(blocked)} blocked issue(s)") for issue in blocked: days_blocked = (datetime.now(timezone.utc) - issue.updated_at).days print(f" #{issue.number}: {issue.title} " f"(blocked for {days_blocked} days)") else: print(f" βœ“ No blocked issues") return blocked def check_unassigned_priority(self) -> List[Issue]: """Find high-priority issues without assignees.""" print("\nπŸ” Checking for unassigned priority issues...") # Get high priority issues high_priority = self.backend.list_issues(IssueFilter( state='open', labels=['priority:high'] )) critical = self.backend.list_issues(IssueFilter( state='open', labels=['priority:critical'] )) all_priority = high_priority + critical unassigned = [issue for issue in all_priority if not issue.assignees] if unassigned: print(f" ⚠️ Found {len(unassigned)} unassigned priority issue(s)") for issue in unassigned: priority = next((l.name for l in issue.labels if l.name.startswith('priority:')), 'unknown') print(f" #{issue.number}: {issue.title} [{priority}]") else: print(f" βœ“ All priority issues are assigned") return unassigned def analyze_velocity(self) -> Dict[str, int]: """Analyze project velocity (issues opened vs closed).""" print("\nπŸ“Š Analyzing project velocity...") # Get recent activity week_ago = datetime.now(timezone.utc) - timedelta(days=7) # Count recently created all_issues = self.backend.list_issues(IssueFilter(limit=1000)) recent_created = len([i for i in all_issues if i.created_at >= week_ago]) # Count recently closed closed_issues = self.backend.list_issues(IssueFilter(state='closed')) recent_closed = len([i for i in closed_issues if i.closed_at and i.closed_at >= week_ago]) # Count open open_issues = len(self.backend.list_issues(IssueFilter(state='open'))) print(f" Last 7 days:") print(f" Created: {recent_created} issues") print(f" Closed: {recent_closed} issues") print(f" Net change: {recent_created - recent_closed:+d}") print(f" Currently open: {open_issues}") if recent_created > recent_closed * 1.5: print(f" ⚠️ Issues are piling up faster than being resolved!") elif recent_closed > recent_created: print(f" βœ“ Good progress - closing more than opening") return { 'created': recent_created, 'closed': recent_closed, 'open': open_issues } def analyze_bottlenecks(self) -> Dict[str, int]: """Identify bottlenecks in the workflow.""" print("\nπŸ” Analyzing workflow bottlenecks...") # Count issues by state states = defaultdict(int) all_open = self.backend.list_issues(IssueFilter(state='open')) for issue in all_open: states[issue.state.value] += 1 # Count issues by label labels = defaultdict(int) for issue in all_open: for label in issue.labels: if label.name.startswith('needs-'): labels[label.name] += 1 print(f" Issues by state:") for state, count in sorted(states.items(), key=lambda x: -x[1]): print(f" {state}: {count}") if labels: print(f" Issues by stage:") for label, count in sorted(labels.items(), key=lambda x: -x[1]): print(f" {label}: {count}") if count > 10: print(f" ⚠️ Potential bottleneck!") return dict(labels) def send_alert(self, title: str, details: List[str]): """Send an alert by creating an issue.""" print(f"\n🚨 Sending alert: {title}") body = f"**Monitoring Alert**\n\n" body += f"The monitoring agent detected potential issues:\n\n" for detail in details: body += f"- {detail}\n" body += f"\n*Generated by {self.agent_id} at {datetime.now(timezone.utc).isoformat()}*" alert_issue = Issue( id=None, number=0, title=f"🚨 Monitor Alert: {title}", description=body, state=IssueState.OPEN, created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), labels=[ Label(name='monitoring'), Label(name='alert'), Label(name='agent-generated') ] ) created = self.backend.create_issue(alert_issue) print(f" βœ“ Alert created as issue #{created.number}") def run_health_check(self, send_alerts: bool = True): """Run complete health check.""" print(f"\nπŸ€– Running issue tracker health check...") print(f" Timestamp: {datetime.now(timezone.utc).isoformat()}\n") alerts = [] # Check for problems stale = self.check_stale_issues() if stale: alerts.append(f"{len(stale)} stale issues (>= {self.stale_threshold.days} days)") blocked = self.check_blocked_issues() if blocked: alerts.append(f"{len(blocked)} blocked issues") unassigned = self.check_unassigned_priority() if unassigned: alerts.append(f"{len(unassigned)} unassigned priority issues") # Analyze metrics velocity = self.analyze_velocity() if velocity['created'] > velocity['closed'] * 2: alerts.append(f"Issue backlog growing rapidly " f"({velocity['created']} created vs {velocity['closed']} closed)") bottlenecks = self.analyze_bottlenecks() for stage, count in bottlenecks.items(): if count > 10: alerts.append(f"{count} issues stuck in stage: {stage}") # Send alert if needed if alerts and send_alerts: self.send_alert("Issue Tracker Health Check", alerts) elif not alerts: print(f"\nβœ… All checks passed - no issues found!") return len(alerts) == 0 def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='Monitoring agent for issue tracker') parser.add_argument('--stale-days', type=int, default=7, help='Days before an issue is considered stale') parser.add_argument('--check-interval', type=int, default=3600, help='Interval between checks in seconds (default: 1 hour)') parser.add_argument('--once', action='store_true', help='Run once and exit') parser.add_argument('--no-alerts', action='store_true', help='Do not create alert issues') args = parser.parse_args() try: agent = MonitoringAgent( agent_id="agent-monitor", stale_days=args.stale_days ) agent.connect() if args.once: agent.run_health_check(send_alerts=not args.no_alerts) else: print(f"\nπŸ€– Monitoring agent starting...") print(f" Check interval: {args.check_interval}s") print(f" Stale threshold: {args.stale_days} days") print(f" Press Ctrl+C to stop\n") while True: agent.run_health_check(send_alerts=not args.no_alerts) print(f"\n⏸️ Waiting {args.check_interval}s until next check...") time.sleep(args.check_interval) except KeyboardInterrupt: print("\n\n⚠️ Interrupted by user") sys.exit(0) except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': main()