""" Single Command Issue Wrap-Up functionality. This module provides comprehensive issue completion automation including: - Requirement validation and verification - Test execution and validation - Cost note creation and database updates - Git operations (add, commit with cost notes) - Comprehensive completion summary The system automates the entire issue closure workflow in a single command. """ import click import subprocess import os import json from datetime import datetime, date from typing import Optional, Dict, Any, List from decimal import Decimal from pathlib import Path from tabulate import tabulate from ..finance.worktime_tracker import WorktimeTracker from ..finance.session_tracker import SessionCostTracker from ..finance.cost_manager import CostItemManager from .activity_tracker import IssueActivityTracker from .manager import IssuePluginManager class IssueWrapUpService: """Service for comprehensive issue wrap-up functionality.""" def __init__(self, db_path: str = "markitect.db"): """Initialize the issue wrap-up service.""" self.db_path = db_path self.worktime_tracker = WorktimeTracker(db_path) self.activity_tracker = IssueActivityTracker(db_path) self.session_tracker = SessionCostTracker(db_path) self.cost_manager = CostItemManager(db_path) self.issue_manager = IssuePluginManager() def wrap_up_issue(self, issue_number: int, force: bool = False) -> Dict[str, Any]: """ Perform comprehensive issue wrap-up. Args: issue_number: Issue number to wrap up force: Skip validation checks if True Returns: Dictionary containing wrap-up results """ wrap_up_results = { 'issue_number': issue_number, 'timestamp': datetime.now(), 'steps': {} } # Step 1: Get issue details click.echo(f"๐Ÿ” Retrieving issue #{issue_number} details...") issue_details = self._get_issue_details(issue_number) wrap_up_results['issue_details'] = issue_details wrap_up_results['steps']['issue_retrieval'] = {'success': bool(issue_details)} if not issue_details and not force: wrap_up_results['steps']['issue_retrieval']['error'] = "Issue not found" return wrap_up_results # Step 2: Review requirements (placeholder - would need issue analysis) click.echo("๐Ÿ“‹ Reviewing requirements...") req_check = self._review_requirements(issue_number, issue_details, force) wrap_up_results['steps']['requirement_review'] = req_check # Step 3: Run associated tests click.echo("๐Ÿงช Running associated tests...") test_results = self._run_issue_tests(issue_number, force) wrap_up_results['steps']['test_execution'] = test_results # Step 4: Run full test suite click.echo("๐Ÿ”ฌ Running full test suite...") full_test_results = self._run_full_tests(force) wrap_up_results['steps']['full_test_execution'] = full_test_results # Step 5: Calculate and update costs click.echo("๐Ÿ’ฐ Calculating and updating costs...") cost_results = self._update_cost_tracking(issue_number, issue_details) wrap_up_results['steps']['cost_tracking'] = cost_results # Step 6: Create/update cost note click.echo("๐Ÿ“„ Creating/updating cost note...") cost_note_results = self._create_cost_note(issue_number, issue_details, cost_results) wrap_up_results['steps']['cost_note'] = cost_note_results # Step 7: Git operations click.echo("๐Ÿ“ฆ Adding and committing changes...") git_results = self._git_operations(issue_number, issue_details) wrap_up_results['steps']['git_operations'] = git_results # Step 8: Close issue click.echo("๐Ÿ”’ Closing issue...") closure_results = self._close_issue(issue_number) wrap_up_results['steps']['issue_closure'] = closure_results return wrap_up_results def _get_issue_details(self, issue_number: int) -> Optional[Dict[str, Any]]: """Retrieve issue details from the backend.""" try: backend = self.issue_manager.get_backend() # This would call the actual backend API # For now, simulate with basic info return { 'number': issue_number, 'title': f"Issue #{issue_number}", 'status': 'open', 'description': 'Issue description would be retrieved from backend' } except Exception as e: return None def _review_requirements(self, issue_number: int, issue_details: Optional[Dict], force: bool) -> Dict[str, Any]: """Review that requirements have been met.""" if force: return {'success': True, 'forced': True} # This would implement actual requirement checking logic # For now, check if there are recent activities activities = self.activity_tracker.get_issue_activities( issue_id=issue_number, limit=10 ) has_implementation = any( activity.has_implementation_activity() for activity in activities ) return { 'success': has_implementation or len(activities) > 0, 'activities_count': len(activities), 'has_implementation_activity': has_implementation } def _run_issue_tests(self, issue_number: int, force: bool) -> Dict[str, Any]: """Run tests associated with the issue.""" test_files = [ f"tests/test_issue_{issue_number}_*.py", f"tests/test_issue_{issue_number}.py" ] results = { 'success': True, 'test_files': [], 'output': [] } for test_pattern in test_files: # Check if test files exist test_files_found = list(Path('.').glob(test_pattern)) for test_file in test_files_found: results['test_files'].append(str(test_file)) try: if force: results['output'].append(f"FORCED: Skipping test execution for {test_file}") continue # Run the specific test cmd = ['.venv/bin/python', '-m', 'pytest', str(test_file), '-v'] result = subprocess.run(cmd, capture_output=True, text=True, cwd='.') results['output'].append({ 'file': str(test_file), 'returncode': result.returncode, 'stdout': result.stdout, 'stderr': result.stderr }) if result.returncode != 0: results['success'] = False except Exception as e: results['success'] = False results['output'].append({ 'file': str(test_file), 'error': str(e) }) if not results['test_files']: results['output'].append(f"No specific test files found for issue #{issue_number}") return results def _run_full_tests(self, force: bool) -> Dict[str, Any]: """Run the full test suite to ensure no regressions.""" if force: return { 'success': True, 'forced': True, 'output': 'FORCED: Skipped full test suite execution' } try: # Try to determine the test command from Makefile or common patterns test_commands = [ ['make', 'test'], ['.venv/bin/python', '-m', 'pytest', '-v'], ['python', '-m', 'pytest', '-v'], ['pytest', '-v'] ] for cmd in test_commands: try: result = subprocess.run(cmd, capture_output=True, text=True, cwd='.', timeout=300) return { 'success': result.returncode == 0, 'command': ' '.join(cmd), 'returncode': result.returncode, 'stdout': result.stdout, 'stderr': result.stderr } except (subprocess.TimeoutExpired, FileNotFoundError): continue return { 'success': False, 'error': 'No suitable test command found' } except Exception as e: return { 'success': False, 'error': str(e) } def _update_cost_tracking(self, issue_number: int, issue_details: Optional[Dict]) -> Dict[str, Any]: """Calculate and register time and cost data in database.""" try: # Get activity data activities = self.activity_tracker.get_issue_activities(issue_id=issue_number) # Get session cost data - method may not exist session_costs = [] try: if hasattr(self.session_tracker, 'get_issue_costs'): session_costs = self.session_tracker.get_issue_costs(issue_number) elif hasattr(self.session_tracker, 'get_costs_for_issue'): session_costs = self.session_tracker.get_costs_for_issue(issue_number) except Exception: # If session cost tracking fails, continue with empty list session_costs = [] # Try to get worktime data - method name may vary total_minutes = 0 try: # Try different possible methods for getting worktime data if hasattr(self.worktime_tracker, 'get_issue_summary'): worktime_summary = self.worktime_tracker.get_issue_summary(issue_number) total_minutes = worktime_summary.get('total_minutes', 0) if worktime_summary else 0 elif hasattr(self.worktime_tracker, 'get_issue_worktime'): worktime_data = self.worktime_tracker.get_issue_worktime(issue_number) total_minutes = worktime_data.get('total_minutes', 0) if worktime_data else 0 # If no specific method available, try to calculate from entries elif hasattr(self.worktime_tracker, 'get_entries'): entries = self.worktime_tracker.get_entries() total_minutes = sum( entry.duration_minutes for entry in entries if hasattr(entry, 'issue_id') and entry.issue_id == issue_number ) except Exception: # If worktime tracking fails, continue with 0 total_minutes = 0 # Calculate totals total_cost = sum(cost.get('cost_eur', 0) for cost in session_costs) cost_data = { 'issue_number': issue_number, 'total_minutes': total_minutes, 'total_hours': total_minutes / 60 if total_minutes else 0, 'total_cost_eur': total_cost, 'activity_count': len(activities), 'session_count': len(session_costs) } # This would register in a centralized cost tracking system # For now, just return the calculated data return { 'success': True, 'cost_data': cost_data } except Exception as e: return { 'success': False, 'error': str(e) } def _create_cost_note(self, issue_number: int, issue_details: Optional[Dict], cost_results: Dict) -> Dict[str, Any]: """Create or update cost note for the issue.""" try: cost_data = cost_results.get('cost_data', {}) # Create cost note content cost_note_content = self._generate_cost_note_content( issue_number, issue_details, cost_data ) # Write cost note file cost_note_path = Path(f"cost_notes/issue_{issue_number}_cost_{date.today().isoformat()}.md") cost_note_path.parent.mkdir(exist_ok=True) with open(cost_note_path, 'w') as f: f.write(cost_note_content) return { 'success': True, 'cost_note_path': str(cost_note_path) } except Exception as e: return { 'success': False, 'error': str(e) } def _generate_cost_note_content(self, issue_number: int, issue_details: Optional[Dict], cost_data: Dict) -> str: """Generate cost note content.""" title = issue_details.get('title', f'Issue #{issue_number}') if issue_details else f'Issue #{issue_number}' total_cost_eur = cost_data.get('total_cost_eur', 0) total_cost_usd = total_cost_eur / 0.92 if total_cost_eur else 0 # Approximate conversion content = f"""--- note_type: "issue_cost_tracking" issue_id: {issue_number} issue_title: "{title}" session_date: "{date.today().isoformat()}" claude_model: "claude-sonnet-4" total_cost_eur: {total_cost_eur:.4f} total_cost_usd: {total_cost_usd:.3f} total_minutes: {cost_data.get('total_minutes', 0)} implementation_time_minutes: {cost_data.get('total_minutes', 0)} generated_at: "{datetime.now().isoformat()}" --- # Issue #{issue_number} Implementation Cost **Issue**: {title} **Date**: {date.today().isoformat()} **Claude Model**: claude-sonnet-4 ## Cost Summary - **Total Cost**: โ‚ฌ{total_cost_eur:.4f} (${total_cost_usd:.4f} USD) - **Implementation Time**: {cost_data.get('total_hours', 0):.1f} hours ({cost_data.get('total_minutes', 0)} minutes) - **Activities Tracked**: {cost_data.get('activity_count', 0)} activities - **Sessions**: {cost_data.get('session_count', 0)} cost sessions ## Implementation Summary Issue #{issue_number} "{title}" has been completed and wrapped up through automated process. ## Cost Allocation This cost has been allocated to issue #{issue_number} implementation. ## Notes - Currency conversion rate: 1 USD = 0.920 EUR - Pricing based on claude-sonnet-4 rates as of {date.today().isoformat()} - Implementation time includes design, coding, testing, and validation """ return content def _git_operations(self, issue_number: int, issue_details: Optional[Dict]) -> Dict[str, Any]: """Perform git add and commit operations.""" try: # Add all changes including cost notes result_add = subprocess.run(['git', 'add', '.'], capture_output=True, text=True) if result_add.returncode != 0: return { 'success': False, 'error': f'Git add failed: {result_add.stderr}' } # Create commit message title = issue_details.get('title', f'Issue #{issue_number}') if issue_details else f'Issue #{issue_number}' commit_message = f"""feat: complete issue #{issue_number} - {title} Automated issue wrap-up including: - Implementation completion verification - Test execution and validation - Cost tracking and note generation - Repository state commit ๐Ÿค– Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude """ # Commit changes result_commit = subprocess.run( ['git', 'commit', '-m', commit_message], capture_output=True, text=True ) return { 'success': result_commit.returncode == 0, 'add_output': result_add.stdout, 'commit_output': result_commit.stdout, 'commit_error': result_commit.stderr if result_commit.returncode != 0 else None } except Exception as e: return { 'success': False, 'error': str(e) } def _close_issue(self, issue_number: int) -> Dict[str, Any]: """Close the issue using the issue management system.""" try: # Log closing activity self.activity_tracker.log_activity( issue_id=issue_number, activity_type="close", description=f"Issue #{issue_number} completed via automated wrap-up process" ) # Try to close via make command (most reliable method) try: result = subprocess.run( ['make', 'close-issue', f'NUM={issue_number}'], capture_output=True, text=True, cwd='.' ) return { 'success': result.returncode == 0, 'method': 'make', 'output': result.stdout, 'error': result.stderr if result.returncode != 0 else None } except Exception: # Fallback to direct backend call try: backend = self.issue_manager.get_backend() # This would call backend.close_issue(issue_number) return { 'success': False, 'method': 'backend', 'error': 'Backend closure not implemented' } except Exception as e: return { 'success': False, 'method': 'backend', 'error': str(e) } except Exception as e: return { 'success': False, 'error': str(e) } def format_summary(self, results: Dict[str, Any]) -> str: """Format wrap-up results as a readable summary.""" issue_num = results['issue_number'] timestamp = results['timestamp'].strftime('%Y-%m-%d %H:%M:%S') summary = [ f"\n๐ŸŽ‰ Issue #{issue_num} Wrap-Up Complete", f"๐Ÿ“… Completed: {timestamp}", "=" * 50 ] # Step-by-step results steps = results.get('steps', {}) step_names = { 'issue_retrieval': '๐Ÿ” Issue Retrieval', 'requirement_review': '๐Ÿ“‹ Requirement Review', 'test_execution': '๐Ÿงช Associated Tests', 'full_test_execution': '๐Ÿ”ฌ Full Test Suite', 'cost_tracking': '๐Ÿ’ฐ Cost Tracking', 'cost_note': '๐Ÿ“„ Cost Note', 'git_operations': '๐Ÿ“ฆ Git Operations', 'issue_closure': '๐Ÿ”’ Issue Closure' } for step_key, step_name in step_names.items(): if step_key in steps: step_result = steps[step_key] success = step_result.get('success', False) status = "โœ… SUCCESS" if success else "โŒ FAILED" summary.append(f"{step_name}: {status}") if not success and 'error' in step_result: summary.append(f" Error: {step_result['error']}") # Cost information if 'cost_tracking' in steps and steps['cost_tracking'].get('success'): cost_data = steps['cost_tracking'].get('cost_data', {}) if cost_data: summary.extend([ "", "๐Ÿ’ฐ Cost Summary:", f" Time: {cost_data.get('total_hours', 0):.1f} hours", f" Cost: โ‚ฌ{cost_data.get('total_cost_eur', 0):.4f}", f" Activities: {cost_data.get('activity_count', 0)}" ]) # Overall status all_critical_success = all( steps.get(step, {}).get('success', False) for step in ['test_execution', 'full_test_execution', 'git_operations'] ) summary.extend([ "", "๐ŸŽฏ Overall Status:", "โœ… SUCCESS - Issue wrap-up completed successfully!" if all_critical_success else "โš ๏ธ PARTIAL - Some steps had issues, please review above" ]) return "\n".join(summary) @click.group() def issue_wrapup(): """Issue wrap-up commands for comprehensive issue completion.""" pass @issue_wrapup.command() @click.argument('issue_number', type=int) @click.option('--force', is_flag=True, help='Skip validation checks and force completion') @click.option('--format', 'output_format', type=click.Choice(['summary', 'detailed', 'json']), default='summary', help='Output format') def complete(issue_number: int, force: bool, output_format: str): """Complete comprehensive wrap-up for an issue. Performs all steps needed to properly close an issue: - Verifies requirements have been met - Runs associated tests and full test suite - Calculates and updates cost tracking - Creates/updates cost notes - Commits changes to repository - Closes the issue - Provides completion summary """ service = IssueWrapUpService() try: results = service.wrap_up_issue(issue_number, force=force) if output_format == 'json': # Convert datetime objects to strings for JSON serialization json_results = json.loads(json.dumps(results, default=str)) click.echo(json.dumps(json_results, indent=2)) elif output_format == 'detailed': click.echo(service.format_summary(results)) # Add detailed step information for step_name, step_data in results.get('steps', {}).items(): if 'output' in step_data: click.echo(f"\n--- {step_name.title()} Details ---") click.echo(json.dumps(step_data['output'], indent=2, default=str)) else: # summary click.echo(service.format_summary(results)) except Exception as e: click.echo(f"โŒ Error during issue wrap-up: {str(e)}", err=True) raise click.ClickException(f"Issue wrap-up failed: {str(e)}") if __name__ == '__main__': issue_wrapup()