Files
issue-core/examples/agents/monitoring_agent.py
tegwick b605d970e3 feat: rename to issue-core and add task ingestion endpoint
Renames the package, distribution, CLI alias, Makefile targets, and
working directory from issue-facade to issue-core, signalling its
role as the authoritative task lifecycle manager for the Coulomb org
(peer to activity-core, rules-core, project-core).

Adds POST /issues/ ingestion endpoint for activity-core's IssueSink,
under a new optional [api] extra. The endpoint is served by `issue
serve`, authenticates via the ISSUE_CORE_API_KEY env var (Bearer or
X-API-Key header), and routes the TaskSpec payload to the configured
default backend with full traceability metadata embedded in
sync_metadata.

- T01: Python package issue_tracker -> issue_core, dir rename
- T02: registered in state hub under custodian domain
- T03: INTENT.md (what it is, what it isn't, how it fits)
- T04: SCOPE.md (in/out-of-scope, integration boundaries)
- T05: POST /issues/ via FastAPI + Uvicorn, 9 unit tests
- T06: docs/nats-task-ingestion.md design stub

Closes ISSC-WP-0001.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-17 05:16:27 +02:00

315 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Monitoring Agent
This agent monitors issue health and sends alerts:
- Detects stale issues (not updated in N days)
- Finds blocked issues waiting too long
- Identifies high-priority issues without assignees
- Reports on project velocity and bottlenecks
Usage:
export GITEA_URL=https://gitea.example.com
export GITEA_TOKEN=your-token
export GITEA_OWNER=your-org
export GITEA_REPO=your-repo
python monitoring_agent.py [--stale-days=7] [--check-interval=3600]
"""
import os
import sys
import time
import argparse
from datetime import datetime, timezone, timedelta
from pathlib import Path
from collections import defaultdict
from typing import List, Dict
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from issue_core.backends.gitea import GiteaBackend
from issue_core.core.models import Issue, Label, User, Comment, IssueState
from issue_core.core.interfaces import IssueFilter
class MonitoringAgent:
"""Agent that monitors issue tracker health and alerts on problems."""
def __init__(self, agent_id: str = "agent-monitor", stale_days: int = 7):
self.agent_id = agent_id
self.stale_threshold = timedelta(days=stale_days)
self.backend = None
def connect(self):
"""Connect to backend."""
base_url = os.environ['GITEA_URL']
token = os.environ['GITEA_TOKEN']
owner = os.environ['GITEA_OWNER']
repo = os.environ['GITEA_REPO']
self.backend = GiteaBackend()
self.backend.connect({
'base_url': base_url,
'token': token,
'owner': owner,
'repo': repo
})
print(f"✓ Connected to {base_url}/{owner}/{repo}")
def check_stale_issues(self) -> List[Issue]:
"""Find issues that haven't been updated recently."""
print("\n🔍 Checking for stale issues...")
cutoff = datetime.now(timezone.utc) - self.stale_threshold
# Get all open issues
all_issues = self.backend.list_issues(IssueFilter(state='open'))
# Filter to stale ones
stale = [issue for issue in all_issues
if issue.updated_at < cutoff]
if stale:
print(f" ⚠️ Found {len(stale)} stale issue(s)")
for issue in stale:
days_stale = (datetime.now(timezone.utc) - issue.updated_at).days
print(f" #{issue.number}: {issue.title} "
f"(stale for {days_stale} days)")
else:
print(f" ✓ No stale issues found")
return stale
def check_blocked_issues(self) -> List[Issue]:
"""Find blocked issues."""
print("\n🔍 Checking for blocked issues...")
blocked = self.backend.list_issues(IssueFilter(
state='blocked'
))
if blocked:
print(f" ⚠️ Found {len(blocked)} blocked issue(s)")
for issue in blocked:
days_blocked = (datetime.now(timezone.utc) - issue.updated_at).days
print(f" #{issue.number}: {issue.title} "
f"(blocked for {days_blocked} days)")
else:
print(f" ✓ No blocked issues")
return blocked
def check_unassigned_priority(self) -> List[Issue]:
"""Find high-priority issues without assignees."""
print("\n🔍 Checking for unassigned priority issues...")
# Get high priority issues
high_priority = self.backend.list_issues(IssueFilter(
state='open',
labels=['priority:high']
))
critical = self.backend.list_issues(IssueFilter(
state='open',
labels=['priority:critical']
))
all_priority = high_priority + critical
unassigned = [issue for issue in all_priority if not issue.assignees]
if unassigned:
print(f" ⚠️ Found {len(unassigned)} unassigned priority issue(s)")
for issue in unassigned:
priority = next((l.name for l in issue.labels
if l.name.startswith('priority:')), 'unknown')
print(f" #{issue.number}: {issue.title} [{priority}]")
else:
print(f" ✓ All priority issues are assigned")
return unassigned
def analyze_velocity(self) -> Dict[str, int]:
"""Analyze project velocity (issues opened vs closed)."""
print("\n📊 Analyzing project velocity...")
# Get recent activity
week_ago = datetime.now(timezone.utc) - timedelta(days=7)
# Count recently created
all_issues = self.backend.list_issues(IssueFilter(limit=1000))
recent_created = len([i for i in all_issues if i.created_at >= week_ago])
# Count recently closed
closed_issues = self.backend.list_issues(IssueFilter(state='closed'))
recent_closed = len([i for i in closed_issues
if i.closed_at and i.closed_at >= week_ago])
# Count open
open_issues = len(self.backend.list_issues(IssueFilter(state='open')))
print(f" Last 7 days:")
print(f" Created: {recent_created} issues")
print(f" Closed: {recent_closed} issues")
print(f" Net change: {recent_created - recent_closed:+d}")
print(f" Currently open: {open_issues}")
if recent_created > recent_closed * 1.5:
print(f" ⚠️ Issues are piling up faster than being resolved!")
elif recent_closed > recent_created:
print(f" ✓ Good progress - closing more than opening")
return {
'created': recent_created,
'closed': recent_closed,
'open': open_issues
}
def analyze_bottlenecks(self) -> Dict[str, int]:
"""Identify bottlenecks in the workflow."""
print("\n🔍 Analyzing workflow bottlenecks...")
# Count issues by state
states = defaultdict(int)
all_open = self.backend.list_issues(IssueFilter(state='open'))
for issue in all_open:
states[issue.state.value] += 1
# Count issues by label
labels = defaultdict(int)
for issue in all_open:
for label in issue.labels:
if label.name.startswith('needs-'):
labels[label.name] += 1
print(f" Issues by state:")
for state, count in sorted(states.items(), key=lambda x: -x[1]):
print(f" {state}: {count}")
if labels:
print(f" Issues by stage:")
for label, count in sorted(labels.items(), key=lambda x: -x[1]):
print(f" {label}: {count}")
if count > 10:
print(f" ⚠️ Potential bottleneck!")
return dict(labels)
def send_alert(self, title: str, details: List[str]):
"""Send an alert by creating an issue."""
print(f"\n🚨 Sending alert: {title}")
body = f"**Monitoring Alert**\n\n"
body += f"The monitoring agent detected potential issues:\n\n"
for detail in details:
body += f"- {detail}\n"
body += f"\n*Generated by {self.agent_id} at {datetime.now(timezone.utc).isoformat()}*"
alert_issue = Issue(
id=None,
number=0,
title=f"🚨 Monitor Alert: {title}",
description=body,
state=IssueState.OPEN,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
labels=[
Label(name='monitoring'),
Label(name='alert'),
Label(name='agent-generated')
]
)
created = self.backend.create_issue(alert_issue)
print(f" ✓ Alert created as issue #{created.number}")
def run_health_check(self, send_alerts: bool = True):
"""Run complete health check."""
print(f"\n🤖 Running issue tracker health check...")
print(f" Timestamp: {datetime.now(timezone.utc).isoformat()}\n")
alerts = []
# Check for problems
stale = self.check_stale_issues()
if stale:
alerts.append(f"{len(stale)} stale issues (>= {self.stale_threshold.days} days)")
blocked = self.check_blocked_issues()
if blocked:
alerts.append(f"{len(blocked)} blocked issues")
unassigned = self.check_unassigned_priority()
if unassigned:
alerts.append(f"{len(unassigned)} unassigned priority issues")
# Analyze metrics
velocity = self.analyze_velocity()
if velocity['created'] > velocity['closed'] * 2:
alerts.append(f"Issue backlog growing rapidly "
f"({velocity['created']} created vs {velocity['closed']} closed)")
bottlenecks = self.analyze_bottlenecks()
for stage, count in bottlenecks.items():
if count > 10:
alerts.append(f"{count} issues stuck in stage: {stage}")
# Send alert if needed
if alerts and send_alerts:
self.send_alert("Issue Tracker Health Check", alerts)
elif not alerts:
print(f"\n✅ All checks passed - no issues found!")
return len(alerts) == 0
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='Monitoring agent for issue tracker')
parser.add_argument('--stale-days', type=int, default=7,
help='Days before an issue is considered stale')
parser.add_argument('--check-interval', type=int, default=3600,
help='Interval between checks in seconds (default: 1 hour)')
parser.add_argument('--once', action='store_true',
help='Run once and exit')
parser.add_argument('--no-alerts', action='store_true',
help='Do not create alert issues')
args = parser.parse_args()
try:
agent = MonitoringAgent(
agent_id="agent-monitor",
stale_days=args.stale_days
)
agent.connect()
if args.once:
agent.run_health_check(send_alerts=not args.no_alerts)
else:
print(f"\n🤖 Monitoring agent starting...")
print(f" Check interval: {args.check_interval}s")
print(f" Stale threshold: {args.stale_days} days")
print(f" Press Ctrl+C to stop\n")
while True:
agent.run_health_check(send_alerts=not args.no_alerts)
print(f"\n⏸️ Waiting {args.check_interval}s until next check...")
time.sleep(args.check_interval)
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user")
sys.exit(0)
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()