refactor: remove obsolete issue management system in favor of issue-facade
Some checks failed
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Some checks failed
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Complete cleanup of the legacy TDD AI and issue management system, establishing clear separation of concerns as requested. All issue handling is now provided by the standalone issue-facade system. Removed components: - TDD AI framework (tddai/ directory and tddai_cli.py) - Legacy issue management CLI commands and services - Issue-related Makefile targets and helper commands - Obsolete tests and infrastructure dependencies - Finance modules that depended on the old issue system Updated: - Makefile: Removed issue-*, tdd-*, and test-from-issue commands - CLI framework: Simplified to core functionality only - Documentation: Added deprecation notice for old config system The issue-facade now serves as the universal CLI for issue tracking, providing backend-agnostic interface to GitHub, GitLab, Gitea, and local SQLite storage as documented in issue-facade/README.md. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -109,8 +109,6 @@ from .schema_generator import SchemaGenerator
|
||||
from .schema_validator import SchemaValidator
|
||||
from .exceptions import FileNotFoundError, InvalidDepthError, SchemaValidationError, InvalidSchemaError
|
||||
|
||||
# Issue management commands - also available via dedicated 'issue' CLI or 'tddai' CLI
|
||||
from .issues.commands import issues_group
|
||||
|
||||
# Global options for CLI configuration
|
||||
pass_config = click.make_pass_decorator(dict, ensure=True)
|
||||
@@ -6184,23 +6182,12 @@ cli.add_command(wishlist_group)
|
||||
|
||||
|
||||
# Register issue management commands
|
||||
cli.add_command(issues_group)
|
||||
|
||||
# Register issue activity tracking commands
|
||||
from markitect.issues.activity_commands import activity as activity_group
|
||||
cli.add_command(activity_group)
|
||||
|
||||
# Register worktime tracking commands
|
||||
from markitect.finance.worktime_commands import worktime as worktime_group
|
||||
cli.add_command(worktime_group)
|
||||
|
||||
# Register day wrap-up commands
|
||||
from markitect.finance.day_wrapup_commands import wrapup as wrapup_group
|
||||
cli.add_command(wrapup_group)
|
||||
|
||||
# Register issue wrap-up commands
|
||||
from markitect.issues.issue_wrapup_commands import issue_wrapup as issue_wrapup_group
|
||||
cli.add_command(issue_wrapup_group)
|
||||
|
||||
|
||||
# Query Paradigm Commands - Issue #62
|
||||
|
||||
@@ -1,566 +0,0 @@
|
||||
"""
|
||||
Cost Allocation Engine for MarkiTect Issue Cost Distribution.
|
||||
|
||||
This module implements the core allocation engine that distributes operational
|
||||
costs across active issues using the defined algorithm from Issue #88.
|
||||
|
||||
The engine handles:
|
||||
- Equal distribution of costs across active issues in a period
|
||||
- Loss carried forward when no active issues exist
|
||||
- Transaction audit trail creation
|
||||
- Edge case handling and validation
|
||||
- Integration with period management and activity tracking
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
from .models import FinanceModels
|
||||
from .cost_manager import CostItemManager
|
||||
from .period_manager import PeriodManager, Period, PeriodStatus
|
||||
from ..issues.activity_tracker import IssueActivityTracker, ActivityType
|
||||
|
||||
|
||||
class AllocationStatus(Enum):
|
||||
"""Status enumeration for allocation operations."""
|
||||
SUCCESS = "success"
|
||||
NO_ACTIVE_ISSUES = "no_active_issues"
|
||||
NO_COSTS_TO_ALLOCATE = "no_costs_to_allocate"
|
||||
PERIOD_CLOSED = "period_closed"
|
||||
ERROR = "error"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AllocationResult:
|
||||
"""Result of a cost allocation operation."""
|
||||
status: AllocationStatus
|
||||
period_id: int
|
||||
total_costs: Decimal = Decimal('0.00')
|
||||
active_issues: List[int] = None
|
||||
cost_per_issue: Decimal = Decimal('0.00')
|
||||
allocations_created: int = 0
|
||||
transactions_created: int = 0
|
||||
loss_carried_forward: Decimal = Decimal('0.00')
|
||||
message: str = ""
|
||||
|
||||
def __post_init__(self):
|
||||
if self.active_issues is None:
|
||||
self.active_issues = []
|
||||
|
||||
|
||||
@dataclass
|
||||
class IssueAllocation:
|
||||
"""Represents a cost allocation to a specific issue."""
|
||||
issue_id: int
|
||||
allocated_amount: Decimal
|
||||
allocation_date: date
|
||||
period_id: int
|
||||
transaction_id: Optional[int] = None
|
||||
|
||||
|
||||
class TransactionManager:
|
||||
"""Manages cost transaction audit trails for allocations."""
|
||||
|
||||
def __init__(self, db_path: str):
|
||||
"""
|
||||
Initialize transaction manager.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database
|
||||
"""
|
||||
self.db_path = db_path
|
||||
self.finance_models = FinanceModels(db_path)
|
||||
|
||||
def create_allocation_transaction(
|
||||
self,
|
||||
period_id: int,
|
||||
amount: Decimal,
|
||||
issue_id: int,
|
||||
transaction_date: date,
|
||||
description: str
|
||||
) -> int:
|
||||
"""
|
||||
Create a cost allocation transaction record.
|
||||
|
||||
Args:
|
||||
period_id: ID of the cost period
|
||||
amount: Amount allocated to the issue
|
||||
issue_id: ID of the issue receiving allocation
|
||||
transaction_date: Date of the transaction
|
||||
description: Description of the allocation
|
||||
|
||||
Returns:
|
||||
ID of the created transaction
|
||||
"""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
INSERT INTO cost_transactions
|
||||
(period_id, transaction_type, amount_eur, issue_id,
|
||||
transaction_date, description)
|
||||
VALUES (?, 'cost_allocated', ?, ?, ?, ?)
|
||||
''', (period_id, float(amount), issue_id, transaction_date.isoformat() if hasattr(transaction_date, 'isoformat') else transaction_date, description))
|
||||
|
||||
return cursor.lastrowid
|
||||
|
||||
def create_loss_forward_transaction(
|
||||
self,
|
||||
from_period_id: int,
|
||||
to_period_id: int,
|
||||
amount: Decimal,
|
||||
transaction_date: date,
|
||||
description: str
|
||||
) -> int:
|
||||
"""
|
||||
Create a loss carried forward transaction.
|
||||
|
||||
Args:
|
||||
from_period_id: Source period ID
|
||||
to_period_id: Destination period ID
|
||||
amount: Amount being carried forward
|
||||
transaction_date: Date of the transaction
|
||||
description: Description of the carry forward
|
||||
|
||||
Returns:
|
||||
ID of the created transaction
|
||||
"""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
INSERT INTO cost_transactions
|
||||
(period_id, transaction_type, amount_eur, transaction_date, description)
|
||||
VALUES (?, 'loss_forward', ?, ?, ?)
|
||||
''', (to_period_id, float(amount), transaction_date.isoformat() if hasattr(transaction_date, 'isoformat') else transaction_date, description))
|
||||
|
||||
return cursor.lastrowid
|
||||
|
||||
|
||||
class AllocationEngine:
|
||||
"""
|
||||
Core cost allocation engine for distributing operational costs to active issues.
|
||||
|
||||
Implements the algorithm defined in Issue #88:
|
||||
1. Calculate total costs for the period (monthly + one-time + carried forward)
|
||||
2. Identify active issues (created/modified during period)
|
||||
3. Distribute costs equally among active issues
|
||||
4. Handle edge cases (no active issues -> carry forward loss)
|
||||
5. Create audit trail transactions
|
||||
6. Update period statistics
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: str = "markitect.db"):
|
||||
"""
|
||||
Initialize the allocation engine.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database
|
||||
"""
|
||||
self.db_path = db_path
|
||||
self.finance_models = FinanceModels(db_path)
|
||||
self.cost_manager = CostItemManager(db_path)
|
||||
self.period_manager = PeriodManager(db_path)
|
||||
self.activity_tracker = IssueActivityTracker(db_path)
|
||||
self.transaction_manager = TransactionManager(db_path)
|
||||
|
||||
# Ensure database schema is initialized
|
||||
self.finance_models.initialize_finance_schema()
|
||||
|
||||
def allocate_period_costs(self, period_id: int) -> AllocationResult:
|
||||
"""
|
||||
Allocate costs for a specific period to active issues.
|
||||
|
||||
Args:
|
||||
period_id: ID of the period to process
|
||||
|
||||
Returns:
|
||||
AllocationResult with operation details and status
|
||||
"""
|
||||
try:
|
||||
# Get period details
|
||||
period = self._get_period(period_id)
|
||||
if not period:
|
||||
return AllocationResult(
|
||||
status=AllocationStatus.ERROR,
|
||||
period_id=period_id,
|
||||
message=f"Period {period_id} not found"
|
||||
)
|
||||
|
||||
# Check if period is already closed
|
||||
if period.status == PeriodStatus.CLOSED.value:
|
||||
return AllocationResult(
|
||||
status=AllocationStatus.PERIOD_CLOSED,
|
||||
period_id=period_id,
|
||||
message=f"Period {period_id} is already closed"
|
||||
)
|
||||
|
||||
# Set period status to calculating
|
||||
self._update_period_status(period_id, PeriodStatus.CALCULATING)
|
||||
|
||||
# Step 1: Calculate total costs for period
|
||||
total_costs = self._calculate_period_total_costs(period)
|
||||
|
||||
if total_costs == Decimal('0.00'):
|
||||
self._update_period_status(period_id, PeriodStatus.CLOSED)
|
||||
return AllocationResult(
|
||||
status=AllocationStatus.NO_COSTS_TO_ALLOCATE,
|
||||
period_id=period_id,
|
||||
total_costs=total_costs,
|
||||
message="No costs to allocate for this period"
|
||||
)
|
||||
|
||||
# Step 2: Identify active issues for the period
|
||||
active_issues = self._get_active_issues_for_period(period)
|
||||
|
||||
if not active_issues:
|
||||
# No active issues - carry forward loss to next period
|
||||
next_period_id = self._get_or_create_next_period(period)
|
||||
if next_period_id:
|
||||
self._carry_forward_loss(period_id, next_period_id, total_costs)
|
||||
|
||||
# Update period and close
|
||||
self._update_period_totals(period_id, total_costs, 0, Decimal('0.00'), total_costs)
|
||||
self._update_period_status(period_id, PeriodStatus.CLOSED)
|
||||
|
||||
return AllocationResult(
|
||||
status=AllocationStatus.NO_ACTIVE_ISSUES,
|
||||
period_id=period_id,
|
||||
total_costs=total_costs,
|
||||
active_issues=[],
|
||||
loss_carried_forward=total_costs,
|
||||
message=f"No active issues found. Carried forward €{total_costs:.2f} to next period"
|
||||
)
|
||||
|
||||
# Step 3: Calculate cost per issue (equal distribution)
|
||||
cost_per_issue = total_costs / len(active_issues)
|
||||
|
||||
# Step 4: Create allocations and transactions
|
||||
allocations_created = 0
|
||||
transactions_created = 0
|
||||
allocation_date = date.today()
|
||||
|
||||
for issue_id in active_issues:
|
||||
# Create allocation record
|
||||
allocation_id = self._create_issue_allocation(
|
||||
issue_id, period_id, cost_per_issue, allocation_date
|
||||
)
|
||||
|
||||
if allocation_id:
|
||||
allocations_created += 1
|
||||
|
||||
# Create audit transaction
|
||||
transaction_id = self.transaction_manager.create_allocation_transaction(
|
||||
period_id=period_id,
|
||||
amount=cost_per_issue,
|
||||
issue_id=issue_id,
|
||||
transaction_date=allocation_date,
|
||||
description=f"Cost allocation for period {period.period_start} to {period.period_end}"
|
||||
)
|
||||
|
||||
if transaction_id:
|
||||
transactions_created += 1
|
||||
# Link transaction to allocation
|
||||
self._update_allocation_transaction_id(allocation_id, transaction_id)
|
||||
|
||||
# Step 5: Update period totals
|
||||
self._update_period_totals(
|
||||
period_id, total_costs, len(active_issues), cost_per_issue, Decimal('0.00')
|
||||
)
|
||||
|
||||
# Step 6: Close the period
|
||||
self._update_period_status(period_id, PeriodStatus.CLOSED)
|
||||
|
||||
return AllocationResult(
|
||||
status=AllocationStatus.SUCCESS,
|
||||
period_id=period_id,
|
||||
total_costs=total_costs,
|
||||
active_issues=active_issues,
|
||||
cost_per_issue=cost_per_issue,
|
||||
allocations_created=allocations_created,
|
||||
transactions_created=transactions_created,
|
||||
message=f"Successfully allocated €{total_costs:.2f} across {len(active_issues)} issues"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Reset period status on error
|
||||
self._update_period_status(period_id, PeriodStatus.OPEN)
|
||||
return AllocationResult(
|
||||
status=AllocationStatus.ERROR,
|
||||
period_id=period_id,
|
||||
message=f"Allocation failed: {str(e)}"
|
||||
)
|
||||
|
||||
def get_issue_allocations(self, issue_id: int) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all cost allocations for a specific issue.
|
||||
|
||||
Args:
|
||||
issue_id: ID of the issue
|
||||
|
||||
Returns:
|
||||
List of allocation records
|
||||
"""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
SELECT
|
||||
ica.id,
|
||||
ica.issue_id,
|
||||
ica.period_id,
|
||||
ica.allocated_amount,
|
||||
ica.allocation_date,
|
||||
ica.transaction_id,
|
||||
cp.period_start,
|
||||
cp.period_end,
|
||||
cp.period_type
|
||||
FROM issue_cost_allocations ica
|
||||
JOIN cost_periods cp ON ica.period_id = cp.id
|
||||
WHERE ica.issue_id = ?
|
||||
ORDER BY ica.allocation_date DESC
|
||||
''', (issue_id,))
|
||||
|
||||
rows = cursor.fetchall()
|
||||
allocations = []
|
||||
|
||||
for row in rows:
|
||||
allocation = {
|
||||
'id': row[0],
|
||||
'issue_id': row[1],
|
||||
'period_id': row[2],
|
||||
'allocated_amount': float(row[3]),
|
||||
'allocation_date': row[4],
|
||||
'transaction_id': row[5],
|
||||
'period_start': row[6],
|
||||
'period_end': row[7],
|
||||
'period_type': row[8]
|
||||
}
|
||||
allocations.append(allocation)
|
||||
|
||||
return allocations
|
||||
|
||||
def get_period_allocations(self, period_id: int) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get all allocations for a specific period.
|
||||
|
||||
Args:
|
||||
period_id: ID of the period
|
||||
|
||||
Returns:
|
||||
List of allocation records
|
||||
"""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
cursor.execute('''
|
||||
SELECT
|
||||
ica.id,
|
||||
ica.issue_id,
|
||||
ica.allocated_amount,
|
||||
ica.allocation_date,
|
||||
ica.transaction_id
|
||||
FROM issue_cost_allocations ica
|
||||
WHERE ica.period_id = ?
|
||||
ORDER BY ica.issue_id
|
||||
''', (period_id,))
|
||||
|
||||
rows = cursor.fetchall()
|
||||
allocations = []
|
||||
|
||||
for row in rows:
|
||||
allocation = {
|
||||
'id': row[0],
|
||||
'issue_id': row[1],
|
||||
'allocated_amount': float(row[2]),
|
||||
'allocation_date': row[3],
|
||||
'transaction_id': row[4]
|
||||
}
|
||||
allocations.append(allocation)
|
||||
|
||||
return allocations
|
||||
|
||||
def reverse_allocation(self, allocation_id: int) -> bool:
|
||||
"""
|
||||
Reverse a cost allocation (for corrections).
|
||||
|
||||
Args:
|
||||
allocation_id: ID of the allocation to reverse
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get allocation details
|
||||
cursor.execute('''
|
||||
SELECT issue_id, period_id, allocated_amount, transaction_id
|
||||
FROM issue_cost_allocations
|
||||
WHERE id = ?
|
||||
''', (allocation_id,))
|
||||
|
||||
result = cursor.fetchone()
|
||||
if not result:
|
||||
return False
|
||||
|
||||
issue_id, period_id, amount, transaction_id = result
|
||||
|
||||
# Create reversal transaction using adjustment type (allows negative amounts)
|
||||
with self.finance_models.get_connection() as conn2:
|
||||
cursor2 = conn2.cursor()
|
||||
cursor2.execute('''
|
||||
INSERT INTO cost_transactions
|
||||
(period_id, transaction_type, amount_eur, issue_id,
|
||||
transaction_date, description)
|
||||
VALUES (?, 'adjustment', ?, ?, ?, ?)
|
||||
''', (period_id, float(-amount), issue_id, date.today().isoformat(), f"Reversal of allocation #{allocation_id}"))
|
||||
|
||||
reversal_transaction_id = cursor2.lastrowid
|
||||
|
||||
# Only delete if reversal transaction was created successfully
|
||||
if reversal_transaction_id:
|
||||
cursor.execute('DELETE FROM issue_cost_allocations WHERE id = ?', (allocation_id,))
|
||||
return cursor.rowcount > 0
|
||||
else:
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
# Log the exception for debugging in tests
|
||||
print(f"Reversal failed with exception: {e}")
|
||||
return False
|
||||
|
||||
def _get_period(self, period_id: int) -> Optional[Period]:
|
||||
"""Get period details by ID."""
|
||||
period_data = self.period_manager.get_period_by_id(period_id)
|
||||
if not period_data:
|
||||
return None
|
||||
|
||||
# Convert dict to Period object
|
||||
return Period(
|
||||
id=period_data['id'],
|
||||
period_start=datetime.strptime(period_data['period_start'], '%Y-%m-%d').date() if period_data['period_start'] else None,
|
||||
period_end=datetime.strptime(period_data['period_end'], '%Y-%m-%d').date() if period_data['period_end'] else None,
|
||||
period_type=period_data['period_type'],
|
||||
status=period_data['status'],
|
||||
total_costs=Decimal(str(period_data['total_costs'])),
|
||||
active_issues_count=period_data['active_issues_count'],
|
||||
cost_per_issue=Decimal(str(period_data['cost_per_issue'])),
|
||||
loss_carried_forward=Decimal(str(period_data['loss_carried_forward'] or 0))
|
||||
)
|
||||
|
||||
def _update_period_status(self, period_id: int, status: PeriodStatus):
|
||||
"""Update period status."""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
'UPDATE cost_periods SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?',
|
||||
(status.value, period_id)
|
||||
)
|
||||
|
||||
def _calculate_period_total_costs(self, period: Period) -> Decimal:
|
||||
"""Calculate total costs for a period including carried forward amounts."""
|
||||
calculations = self.cost_manager.calculate_period_costs(
|
||||
period.period_start, period.period_end
|
||||
)
|
||||
|
||||
period_costs = calculations['total_period']
|
||||
carried_forward = period.loss_carried_forward or Decimal('0.00')
|
||||
|
||||
return Decimal(str(period_costs)) + carried_forward
|
||||
|
||||
def _get_active_issues_for_period(self, period: Period) -> List[int]:
|
||||
"""Get list of active issue IDs for a period."""
|
||||
activities = self.activity_tracker.get_activities_by_period(
|
||||
period.id,
|
||||
activity_types=[
|
||||
ActivityType.CREATED,
|
||||
ActivityType.MODIFIED,
|
||||
ActivityType.COMMENTED,
|
||||
ActivityType.STATUS_CHANGED
|
||||
]
|
||||
)
|
||||
|
||||
# Get unique issue IDs
|
||||
active_issues = list(set(activity.issue_id for activity in activities))
|
||||
return active_issues
|
||||
|
||||
def _get_or_create_next_period(self, current_period: Period) -> Optional[int]:
|
||||
"""Get or create the next period for loss carry forward."""
|
||||
# For now, return None - next period creation will be handled separately
|
||||
# This is a placeholder for future automatic period creation
|
||||
return None
|
||||
|
||||
def _carry_forward_loss(self, from_period_id: int, to_period_id: int, amount: Decimal):
|
||||
"""Carry forward loss to next period."""
|
||||
# Update the destination period's carried forward amount
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
UPDATE cost_periods
|
||||
SET loss_carried_forward = loss_carried_forward + ?
|
||||
WHERE id = ?
|
||||
''', (float(amount), to_period_id))
|
||||
|
||||
# Create audit transaction
|
||||
self.transaction_manager.create_loss_forward_transaction(
|
||||
from_period_id=from_period_id,
|
||||
to_period_id=to_period_id,
|
||||
amount=amount,
|
||||
transaction_date=date.today(),
|
||||
description=f"Loss carried forward from period {from_period_id}"
|
||||
)
|
||||
|
||||
def _create_issue_allocation(
|
||||
self, issue_id: int, period_id: int, amount: Decimal, allocation_date: date
|
||||
) -> Optional[int]:
|
||||
"""Create an issue cost allocation record."""
|
||||
try:
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
INSERT INTO issue_cost_allocations
|
||||
(issue_id, period_id, allocated_amount, allocation_date)
|
||||
VALUES (?, ?, ?, ?)
|
||||
''', (issue_id, period_id, float(amount), allocation_date.isoformat() if hasattr(allocation_date, 'isoformat') else allocation_date))
|
||||
|
||||
return cursor.lastrowid
|
||||
except sqlite3.IntegrityError:
|
||||
# Allocation already exists for this issue/period
|
||||
return None
|
||||
|
||||
def _update_allocation_transaction_id(self, allocation_id: int, transaction_id: int):
|
||||
"""Link allocation to its audit transaction."""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
UPDATE issue_cost_allocations
|
||||
SET transaction_id = ?
|
||||
WHERE id = ?
|
||||
''', (transaction_id, allocation_id))
|
||||
|
||||
def _update_period_totals(
|
||||
self,
|
||||
period_id: int,
|
||||
total_costs: Decimal,
|
||||
active_issues_count: int,
|
||||
cost_per_issue: Decimal,
|
||||
loss_carried_forward: Decimal
|
||||
):
|
||||
"""Update period summary statistics."""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
UPDATE cost_periods
|
||||
SET total_costs = ?,
|
||||
active_issues_count = ?,
|
||||
cost_per_issue = ?,
|
||||
loss_carried_forward = ?,
|
||||
updated_at = CURRENT_TIMESTAMP
|
||||
WHERE id = ?
|
||||
''', (float(total_costs), active_issues_count, float(cost_per_issue), float(loss_carried_forward), period_id))
|
||||
@@ -1,507 +0,0 @@
|
||||
"""
|
||||
Single Command Day Wrap-Up functionality.
|
||||
|
||||
This module provides a comprehensive end-of-day command that consolidates
|
||||
daily work summaries, activity tracking, cost distribution, and reporting
|
||||
into a single convenient command.
|
||||
"""
|
||||
|
||||
import click
|
||||
from datetime import datetime, date, timedelta
|
||||
from typing import Optional, Dict, Any, List
|
||||
from decimal import Decimal
|
||||
from tabulate import tabulate
|
||||
import json
|
||||
|
||||
from .worktime_tracker import WorktimeTracker
|
||||
from ..issues.activity_tracker import IssueActivityTracker
|
||||
from .session_tracker import SessionCostTracker
|
||||
|
||||
|
||||
class DayWrapUpService:
|
||||
"""Service for comprehensive day wrap-up functionality."""
|
||||
|
||||
def __init__(self, db_path: str = "markitect.db"):
|
||||
"""Initialize the day wrap-up service."""
|
||||
self.db_path = db_path
|
||||
self.worktime_tracker = WorktimeTracker(db_path)
|
||||
self.activity_tracker = IssueActivityTracker(db_path)
|
||||
self.session_tracker = SessionCostTracker(db_path)
|
||||
|
||||
def generate_daily_summary(self, target_date: date) -> Dict[str, Any]:
|
||||
"""
|
||||
Generate comprehensive daily summary.
|
||||
|
||||
Args:
|
||||
target_date: Date to generate summary for
|
||||
|
||||
Returns:
|
||||
Dictionary containing complete daily summary
|
||||
"""
|
||||
summary = {
|
||||
'date': target_date,
|
||||
'worktime': self._get_worktime_summary(target_date),
|
||||
'activities': self._get_activity_summary(target_date),
|
||||
'costs': self._get_cost_summary(target_date),
|
||||
'recommendations': []
|
||||
}
|
||||
|
||||
# Add recommendations based on data
|
||||
summary['recommendations'] = self._generate_recommendations(summary)
|
||||
|
||||
return summary
|
||||
|
||||
def _get_worktime_summary(self, target_date: date) -> Dict[str, Any]:
|
||||
"""Get worktime summary for the date."""
|
||||
daily_summary = self.worktime_tracker.get_daily_summary(target_date)
|
||||
|
||||
if not daily_summary:
|
||||
return {
|
||||
'total_minutes': 0,
|
||||
'total_hours': 0.0,
|
||||
'issues_worked': 0,
|
||||
'entries': [],
|
||||
'cost_allocated': None,
|
||||
'cost_per_minute': None
|
||||
}
|
||||
|
||||
# Get issue breakdown
|
||||
issue_breakdown = {}
|
||||
for entry in daily_summary.entries:
|
||||
if entry.issue_id not in issue_breakdown:
|
||||
issue_breakdown[entry.issue_id] = {
|
||||
'minutes': 0,
|
||||
'entries': 0,
|
||||
'descriptions': []
|
||||
}
|
||||
issue_breakdown[entry.issue_id]['minutes'] += entry.duration_minutes
|
||||
issue_breakdown[entry.issue_id]['entries'] += 1
|
||||
if entry.description:
|
||||
issue_breakdown[entry.issue_id]['descriptions'].append(entry.description)
|
||||
|
||||
return {
|
||||
'total_minutes': daily_summary.total_minutes,
|
||||
'total_hours': daily_summary.total_minutes / 60,
|
||||
'issues_worked': daily_summary.issue_count,
|
||||
'entries': len(daily_summary.entries),
|
||||
'issue_breakdown': issue_breakdown,
|
||||
'cost_allocated': float(daily_summary.total_cost_allocated) if daily_summary.total_cost_allocated else None,
|
||||
'cost_per_minute': float(daily_summary.cost_per_minute) if daily_summary.cost_per_minute else None
|
||||
}
|
||||
|
||||
def _get_activity_summary(self, target_date: date) -> Dict[str, Any]:
|
||||
"""Get activity summary for the date."""
|
||||
summary = self.activity_tracker.get_activity_summary(
|
||||
start_date=target_date,
|
||||
end_date=target_date
|
||||
)
|
||||
|
||||
# Get detailed activities for the day
|
||||
activities = []
|
||||
if summary['total_activities'] > 0:
|
||||
# Get activities by checking each issue that had activity
|
||||
with self.activity_tracker.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
SELECT issue_id, activity_type, activity_details, created_at
|
||||
FROM issue_activity_log
|
||||
WHERE activity_date = ?
|
||||
ORDER BY created_at DESC
|
||||
''', (target_date.isoformat() if hasattr(target_date, 'isoformat') else target_date,))
|
||||
|
||||
for row in cursor.fetchall():
|
||||
activities.append({
|
||||
'issue_id': row[0],
|
||||
'activity_type': row[1],
|
||||
'details': row[2],
|
||||
'created_at': row[3]
|
||||
})
|
||||
|
||||
return {
|
||||
'total_activities': summary['total_activities'],
|
||||
'unique_issues': summary['unique_issues'],
|
||||
'activities_by_type': summary['activities_by_type'],
|
||||
'activities': activities
|
||||
}
|
||||
|
||||
def _get_cost_summary(self, target_date: date) -> Dict[str, Any]:
|
||||
"""Get cost summary for the date."""
|
||||
# Get session costs from cost notes for the day
|
||||
cost_summary = self.session_tracker.get_issue_costs_summary()
|
||||
|
||||
# Filter for today's costs (this is approximate - would need better filtering in real implementation)
|
||||
daily_costs = 0.0
|
||||
issue_costs = {}
|
||||
|
||||
# Get worktime cost distribution if available
|
||||
with self.worktime_tracker.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
SELECT issue_id, cost_allocated
|
||||
FROM worktime_cost_distributions
|
||||
WHERE work_date = ?
|
||||
''', (target_date.isoformat() if hasattr(target_date, 'isoformat') else target_date,))
|
||||
|
||||
for row in cursor.fetchall():
|
||||
issue_id, cost = row
|
||||
issue_costs[issue_id] = cost
|
||||
daily_costs += cost
|
||||
|
||||
return {
|
||||
'daily_total': daily_costs,
|
||||
'issue_costs': issue_costs,
|
||||
'has_cost_allocation': len(issue_costs) > 0
|
||||
}
|
||||
|
||||
def _generate_recommendations(self, summary: Dict[str, Any]) -> List[str]:
|
||||
"""Generate recommendations based on daily summary."""
|
||||
recommendations = []
|
||||
|
||||
# Worktime recommendations
|
||||
worktime = summary['worktime']
|
||||
if worktime['total_minutes'] == 0:
|
||||
recommendations.append("⚠️ No worktime logged for today. Consider logging time spent on issues.")
|
||||
elif worktime['total_hours'] < 4:
|
||||
recommendations.append("⏰ Low worktime logged today. Is this accurate or should more time be added?")
|
||||
elif worktime['total_hours'] > 10:
|
||||
recommendations.append("🔥 High worktime logged today. Make sure to take breaks!")
|
||||
|
||||
# Activity recommendations
|
||||
activities = summary['activities']
|
||||
if activities['total_activities'] == 0:
|
||||
recommendations.append("📝 No issue activities logged today. Consider what issues you worked on.")
|
||||
elif activities['unique_issues'] > 5:
|
||||
recommendations.append("🤹 Many issues worked on today. Consider focusing on fewer issues for better productivity.")
|
||||
|
||||
# Cost recommendations
|
||||
costs = summary['costs']
|
||||
if worktime['total_minutes'] > 0 and not costs['has_cost_allocation']:
|
||||
recommendations.append("💰 Time logged but no costs distributed. Run cost distribution to allocate daily expenses.")
|
||||
|
||||
return recommendations
|
||||
|
||||
def perform_auto_estimation(self, target_date: date, total_hours: float = 8.0) -> Dict[str, Any]:
|
||||
"""
|
||||
Perform automatic worktime estimation if no time is logged.
|
||||
|
||||
Args:
|
||||
target_date: Date to estimate for
|
||||
total_hours: Total hours to distribute
|
||||
|
||||
Returns:
|
||||
Estimation results
|
||||
"""
|
||||
# Check if any time is already logged
|
||||
summary = self.worktime_tracker.get_daily_summary(target_date)
|
||||
if summary and summary.total_minutes > 0:
|
||||
return {
|
||||
'estimated': False,
|
||||
'reason': 'Time already logged for this date',
|
||||
'existing_minutes': summary.total_minutes
|
||||
}
|
||||
|
||||
# Get active issues for the day from activity log
|
||||
with self.activity_tracker.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('''
|
||||
SELECT DISTINCT issue_id
|
||||
FROM issue_activity_log
|
||||
WHERE activity_date = ?
|
||||
''', (target_date.isoformat() if hasattr(target_date, 'isoformat') else target_date,))
|
||||
active_issues = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
if not active_issues:
|
||||
return {
|
||||
'estimated': False,
|
||||
'reason': 'No active issues found for this date',
|
||||
'active_issues': []
|
||||
}
|
||||
|
||||
# Perform estimation
|
||||
estimation_result = self.worktime_tracker.estimate_daily_worktime(
|
||||
work_date=target_date,
|
||||
total_hours=total_hours,
|
||||
issues=active_issues,
|
||||
distribution_method="activity_based"
|
||||
)
|
||||
|
||||
return {
|
||||
'estimated': True,
|
||||
'estimation_result': estimation_result
|
||||
}
|
||||
|
||||
def distribute_daily_costs(self, target_date: date, daily_cost: Decimal) -> Dict[str, Any]:
|
||||
"""
|
||||
Distribute daily costs based on worktime allocation.
|
||||
|
||||
Args:
|
||||
target_date: Date to distribute costs for
|
||||
daily_cost: Total daily cost to distribute
|
||||
|
||||
Returns:
|
||||
Distribution results
|
||||
"""
|
||||
return self.worktime_tracker.distribute_daily_costs(
|
||||
work_date=target_date,
|
||||
total_daily_cost=daily_cost
|
||||
)
|
||||
|
||||
|
||||
@click.group()
|
||||
def wrapup():
|
||||
"""Day wrap-up commands for end-of-day summaries and automation."""
|
||||
pass
|
||||
|
||||
|
||||
@wrapup.command()
|
||||
@click.argument('date', type=click.DateTime(formats=['%Y-%m-%d']), required=False)
|
||||
@click.option('--auto-estimate', is_flag=True,
|
||||
help='Automatically estimate worktime if none logged')
|
||||
@click.option('--estimate-hours', type=float, default=8.0,
|
||||
help='Hours to estimate (used with --auto-estimate)')
|
||||
@click.option('--distribute-cost', type=float,
|
||||
help='Daily cost to distribute (€)')
|
||||
@click.option('--format', 'output_format', type=click.Choice(['summary', 'detailed', 'json']),
|
||||
default='summary', help='Output format')
|
||||
def daily(date: Optional[datetime], auto_estimate: bool, estimate_hours: float,
|
||||
distribute_cost: Optional[float], output_format: str):
|
||||
"""Generate comprehensive daily wrap-up summary.
|
||||
|
||||
If no date is provided, uses today's date.
|
||||
"""
|
||||
from datetime import date as date_module
|
||||
target_date = date.date() if date else date_module.today()
|
||||
service = DayWrapUpService()
|
||||
|
||||
try:
|
||||
# Auto-estimate worktime if requested
|
||||
if auto_estimate:
|
||||
click.echo(f"🤖 Auto-estimating worktime for {target_date}...")
|
||||
estimation = service.perform_auto_estimation(target_date, estimate_hours)
|
||||
|
||||
if estimation['estimated']:
|
||||
result = estimation['estimation_result']
|
||||
click.echo(f"✅ Estimated {estimate_hours}h across {result['issues_count']} issues")
|
||||
else:
|
||||
click.echo(f"ℹ️ {estimation['reason']}")
|
||||
|
||||
# Distribute costs if requested
|
||||
if distribute_cost:
|
||||
click.echo(f"💰 Distributing €{distribute_cost:.2f} for {target_date}...")
|
||||
distribution = service.distribute_daily_costs(target_date, Decimal(str(distribute_cost)))
|
||||
|
||||
if 'message' in distribution:
|
||||
click.echo(f"⚠️ {distribution['message']}")
|
||||
else:
|
||||
click.echo(f"✅ Distributed €{distribute_cost:.2f} across {distribution['issues_count']} issues")
|
||||
|
||||
# Generate summary
|
||||
summary = service.generate_daily_summary(target_date)
|
||||
|
||||
if output_format == 'json':
|
||||
# Convert date to string for JSON serialization
|
||||
summary['date'] = summary['date'].isoformat()
|
||||
click.echo(json.dumps(summary, indent=2))
|
||||
return
|
||||
|
||||
# Display summary
|
||||
_display_daily_summary(summary, output_format)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error generating daily wrap-up: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@wrapup.command()
|
||||
@click.argument('start_date', type=click.DateTime(formats=['%Y-%m-%d']))
|
||||
@click.argument('end_date', type=click.DateTime(formats=['%Y-%m-%d']))
|
||||
@click.option('--format', 'output_format', type=click.Choice(['summary', 'json']),
|
||||
default='summary', help='Output format')
|
||||
def period(start_date: datetime, end_date: datetime, output_format: str):
|
||||
"""Generate wrap-up summary for a date range."""
|
||||
service = DayWrapUpService()
|
||||
|
||||
try:
|
||||
# Get worktime report for period
|
||||
worktime_report = service.worktime_tracker.get_worktime_report(
|
||||
start_date=start_date.date(),
|
||||
end_date=end_date.date()
|
||||
)
|
||||
|
||||
# Get activity summary for period
|
||||
activity_summary = service.activity_tracker.get_activity_summary(
|
||||
start_date=start_date.date(),
|
||||
end_date=end_date.date()
|
||||
)
|
||||
|
||||
period_summary = {
|
||||
'period': f"{start_date.date()} to {end_date.date()}",
|
||||
'worktime': worktime_report,
|
||||
'activities': activity_summary
|
||||
}
|
||||
|
||||
if output_format == 'json':
|
||||
click.echo(json.dumps(period_summary, indent=2))
|
||||
else:
|
||||
_display_period_summary(period_summary)
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error generating period wrap-up: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@wrapup.command()
|
||||
@click.argument('date', type=click.DateTime(formats=['%Y-%m-%d']), required=False)
|
||||
@click.option('--hours', type=float, default=8.0, help='Total hours worked')
|
||||
@click.option('--method', type=click.Choice(['equal', 'activity_based']),
|
||||
default='activity_based', help='Estimation method')
|
||||
def estimate(date: Optional[datetime], hours: float, method: str):
|
||||
"""Estimate and log worktime for a day based on issue activities."""
|
||||
from datetime import date as date_module
|
||||
target_date = date.date() if date else date_module.today()
|
||||
service = DayWrapUpService()
|
||||
|
||||
try:
|
||||
estimation = service.perform_auto_estimation(target_date, hours)
|
||||
|
||||
if not estimation['estimated']:
|
||||
click.echo(f"⚠️ {estimation['reason']}")
|
||||
return
|
||||
|
||||
result = estimation['estimation_result']
|
||||
click.echo(f"✅ Estimated worktime for {target_date}")
|
||||
click.echo(f"Total Hours: {hours}h")
|
||||
click.echo(f"Distribution Method: {method}")
|
||||
click.echo(f"Issues: {result['issues_count']}")
|
||||
|
||||
# Show breakdown
|
||||
headers = ['Issue', 'Time', 'Percentage']
|
||||
rows = []
|
||||
total_minutes = result['total_minutes']
|
||||
|
||||
for issue_id, minutes in result['issue_estimates'].items():
|
||||
percentage = (minutes / total_minutes) * 100
|
||||
hours_mins = f"{minutes//60}h{minutes%60}m" if minutes >= 60 else f"{minutes}m"
|
||||
rows.append([f"#{issue_id}", hours_mins, f"{percentage:.1f}%"])
|
||||
|
||||
click.echo("\nEstimated Time Distribution:")
|
||||
click.echo(tabulate(rows, headers=headers, tablefmt='grid'))
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error estimating worktime: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
def _display_daily_summary(summary: Dict[str, Any], format_type: str):
|
||||
"""Display daily summary in formatted output."""
|
||||
date_str = summary['date']
|
||||
worktime = summary['worktime']
|
||||
activities = summary['activities']
|
||||
costs = summary['costs']
|
||||
recommendations = summary['recommendations']
|
||||
|
||||
click.echo(f"\n📊 Daily Wrap-Up for {date_str}")
|
||||
click.echo("=" * 50)
|
||||
|
||||
# Worktime section
|
||||
click.echo(f"\n⏰ WORKTIME SUMMARY")
|
||||
if worktime['total_minutes'] > 0:
|
||||
hours = int(worktime['total_hours'])
|
||||
minutes = int((worktime['total_hours'] - hours) * 60)
|
||||
click.echo(f"Total Time: {hours}h {minutes}m ({worktime['total_minutes']} minutes)")
|
||||
click.echo(f"Issues Worked: {worktime['issues_worked']}")
|
||||
click.echo(f"Time Entries: {worktime['entries']}")
|
||||
|
||||
if worktime['cost_allocated']:
|
||||
click.echo(f"Cost Allocated: €{worktime['cost_allocated']:.2f}")
|
||||
click.echo(f"Cost per Minute: €{worktime['cost_per_minute']:.4f}")
|
||||
|
||||
if format_type == 'detailed' and worktime['issue_breakdown']:
|
||||
click.echo("\nTime by Issue:")
|
||||
headers = ['Issue', 'Time', 'Entries', 'Percentage']
|
||||
rows = []
|
||||
|
||||
for issue_id, data in worktime['issue_breakdown'].items():
|
||||
percentage = (data['minutes'] / worktime['total_minutes']) * 100
|
||||
time_str = f"{data['minutes']//60}h{data['minutes']%60}m" if data['minutes'] >= 60 else f"{data['minutes']}m"
|
||||
rows.append([f"#{issue_id}", time_str, data['entries'], f"{percentage:.1f}%"])
|
||||
|
||||
click.echo(tabulate(rows, headers=headers, tablefmt='grid'))
|
||||
else:
|
||||
click.echo("No worktime logged today")
|
||||
|
||||
# Activities section
|
||||
click.echo(f"\n📝 ACTIVITIES SUMMARY")
|
||||
if activities['total_activities'] > 0:
|
||||
click.echo(f"Total Activities: {activities['total_activities']}")
|
||||
click.echo(f"Issues with Activity: {activities['unique_issues']}")
|
||||
|
||||
if activities['activities_by_type']:
|
||||
click.echo("\nActivity Breakdown:")
|
||||
for activity_type, count in activities['activities_by_type'].items():
|
||||
click.echo(f" {activity_type.title()}: {count}")
|
||||
|
||||
if format_type == 'detailed' and activities['activities']:
|
||||
click.echo("\nRecent Activities:")
|
||||
for activity in activities['activities'][:5]: # Show last 5
|
||||
details = f" - {activity['details']}" if activity['details'] else ""
|
||||
click.echo(f" #{activity['issue_id']}: {activity['activity_type']}{details}")
|
||||
else:
|
||||
click.echo("No activities logged today")
|
||||
|
||||
# Costs section
|
||||
click.echo(f"\n💰 COST SUMMARY")
|
||||
if costs['has_cost_allocation']:
|
||||
click.echo(f"Daily Total: €{costs['daily_total']:.2f}")
|
||||
click.echo("Cost Allocation:")
|
||||
for issue_id, cost in costs['issue_costs'].items():
|
||||
click.echo(f" Issue #{issue_id}: €{cost:.2f}")
|
||||
else:
|
||||
click.echo("No cost allocation for today")
|
||||
|
||||
# Recommendations section
|
||||
if recommendations:
|
||||
click.echo(f"\n💡 RECOMMENDATIONS")
|
||||
for rec in recommendations:
|
||||
click.echo(f" {rec}")
|
||||
|
||||
click.echo()
|
||||
|
||||
|
||||
def _display_period_summary(summary: Dict[str, Any]):
|
||||
"""Display period summary in formatted output."""
|
||||
click.echo(f"\n📈 Period Wrap-Up: {summary['period']}")
|
||||
click.echo("=" * 60)
|
||||
|
||||
worktime = summary['worktime']
|
||||
activities = summary['activities']
|
||||
|
||||
# Worktime summary
|
||||
click.echo(f"\n⏰ WORKTIME OVERVIEW")
|
||||
click.echo(f"Total Time: {worktime['total_time']['hours']}h {worktime['total_time']['minutes']}m")
|
||||
click.echo(f"Total Entries: {worktime['total_entries']}")
|
||||
click.echo(f"Unique Issues: {worktime['unique_issues']}")
|
||||
click.echo(f"Unique Dates: {worktime['unique_dates']}")
|
||||
|
||||
if worktime['unique_dates'] > 0:
|
||||
avg_minutes = worktime['average_minutes_per_day']
|
||||
avg_hours = int(avg_minutes // 60)
|
||||
avg_mins = int(avg_minutes % 60)
|
||||
click.echo(f"Average per Day: {avg_hours}h {avg_mins}m")
|
||||
|
||||
# Activities summary
|
||||
click.echo(f"\n📝 ACTIVITIES OVERVIEW")
|
||||
click.echo(f"Total Activities: {activities['total_activities']}")
|
||||
click.echo(f"Unique Issues: {activities['unique_issues']}")
|
||||
|
||||
if activities['activities_by_type']:
|
||||
click.echo("\nActivity Types:")
|
||||
for activity_type, count in activities['activities_by_type'].items():
|
||||
percentage = (count / activities['total_activities']) * 100
|
||||
click.echo(f" {activity_type.title()}: {count} ({percentage:.1f}%)")
|
||||
|
||||
click.echo()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
wrapup()
|
||||
@@ -1,7 +0,0 @@
|
||||
"""
|
||||
Issue management module for MarkiTect.
|
||||
|
||||
Provides unified CLI interface for issue management with pluggable backend support.
|
||||
"""
|
||||
|
||||
__version__ = "1.0.0"
|
||||
@@ -1,271 +0,0 @@
|
||||
"""
|
||||
CLI commands for issue activity tracking.
|
||||
|
||||
This module provides command-line interface for logging, viewing, and managing
|
||||
issue activities for cost allocation and project management purposes.
|
||||
"""
|
||||
|
||||
import click
|
||||
from datetime import datetime, date
|
||||
from typing import List, Optional
|
||||
from tabulate import tabulate
|
||||
|
||||
from markitect.issues.activity_tracker import IssueActivityTracker, ActivityType, IssueActivity
|
||||
|
||||
|
||||
@click.group()
|
||||
def activity():
|
||||
"""Issue activity tracking commands."""
|
||||
pass
|
||||
|
||||
|
||||
@activity.command()
|
||||
@click.argument('issue_id', type=int)
|
||||
@click.argument('activity_type', type=click.Choice([at.value for at in ActivityType]))
|
||||
@click.option('--date', '-d', type=click.DateTime(formats=['%Y-%m-%d']),
|
||||
help='Activity date (defaults to today)')
|
||||
@click.option('--details', '-m', help='Activity details/message')
|
||||
@click.option('--period-id', type=int, help='Cost period ID for allocation')
|
||||
def log(issue_id: int, activity_type: str, date: Optional[datetime],
|
||||
details: Optional[str], period_id: Optional[int]):
|
||||
"""Log an activity for an issue."""
|
||||
tracker = IssueActivityTracker()
|
||||
|
||||
activity_date = date.date() if date else None
|
||||
activity_enum = ActivityType(activity_type)
|
||||
|
||||
try:
|
||||
activity_id = tracker.log_activity(
|
||||
issue_id=issue_id,
|
||||
activity_type=activity_enum,
|
||||
activity_date=activity_date,
|
||||
activity_details=details,
|
||||
period_id=period_id
|
||||
)
|
||||
|
||||
click.echo(f"✅ Logged {activity_type} activity for issue #{issue_id} (ID: {activity_id})")
|
||||
|
||||
if details:
|
||||
click.echo(f" Details: {details}")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error logging activity: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@activity.command()
|
||||
@click.argument('issue_id', type=int)
|
||||
@click.option('--limit', '-l', type=int, default=20, help='Maximum number of activities to show')
|
||||
@click.option('--offset', type=int, default=0, help='Number of activities to skip')
|
||||
@click.option('--format', 'output_format', type=click.Choice(['table', 'json']),
|
||||
default='table', help='Output format')
|
||||
def show(issue_id: int, limit: int, offset: int, output_format: str):
|
||||
"""Show activities for a specific issue."""
|
||||
tracker = IssueActivityTracker()
|
||||
|
||||
try:
|
||||
activities = tracker.get_issue_activities(issue_id, limit=limit, offset=offset)
|
||||
|
||||
if not activities:
|
||||
click.echo(f"📝 No activities found for issue #{issue_id}")
|
||||
return
|
||||
|
||||
if output_format == 'json':
|
||||
import json
|
||||
activity_data = [activity.to_dict() for activity in activities]
|
||||
click.echo(json.dumps(activity_data, indent=2))
|
||||
|
||||
else:
|
||||
# Table format
|
||||
click.echo(f"\n📋 Activities for Issue #{issue_id}\n")
|
||||
|
||||
headers = ['ID', 'Type', 'Date', 'Period', 'Details', 'Logged']
|
||||
rows = []
|
||||
|
||||
for activity in activities:
|
||||
rows.append([
|
||||
activity.id,
|
||||
activity.activity_type_display,
|
||||
activity.formatted_date,
|
||||
activity.period_id or 'N/A',
|
||||
activity.truncated_details,
|
||||
activity.formatted_datetime
|
||||
])
|
||||
|
||||
click.echo(tabulate(rows, headers=headers, tablefmt='grid'))
|
||||
|
||||
if len(activities) == limit:
|
||||
click.echo(f"\n💡 Showing {limit} most recent activities. Use --limit and --offset for pagination.")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error retrieving activities: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@activity.command()
|
||||
@click.option('--period-id', type=int, help='Filter by cost period ID')
|
||||
@click.option('--activity-type', type=click.Choice([at.value for at in ActivityType]),
|
||||
multiple=True, help='Filter by activity types (can specify multiple)')
|
||||
@click.option('--format', 'output_format', type=click.Choice(['table', 'json']),
|
||||
default='table', help='Output format')
|
||||
def list(period_id: Optional[int], activity_type: List[str], output_format: str):
|
||||
"""List activities across issues."""
|
||||
tracker = IssueActivityTracker()
|
||||
|
||||
try:
|
||||
if period_id:
|
||||
activity_types = [ActivityType(at) for at in activity_type] if activity_type else None
|
||||
activities = tracker.get_activities_by_period(period_id, activity_types)
|
||||
title = f"Activities for Period #{period_id}"
|
||||
else:
|
||||
# For now, show recent activities across all issues (could be enhanced)
|
||||
click.echo("❌ Currently only period-based listing is supported. Use --period-id option.")
|
||||
return
|
||||
|
||||
if not activities:
|
||||
click.echo(f"📝 No activities found for the specified criteria")
|
||||
return
|
||||
|
||||
if output_format == 'json':
|
||||
import json
|
||||
activity_data = [activity.to_dict() for activity in activities]
|
||||
click.echo(json.dumps(activity_data, indent=2))
|
||||
|
||||
else:
|
||||
# Table format
|
||||
click.echo(f"\n📊 {title}\n")
|
||||
|
||||
headers = ['ID', 'Issue', 'Type', 'Date', 'Details']
|
||||
rows = []
|
||||
|
||||
for activity in activities:
|
||||
rows.append([
|
||||
activity.id,
|
||||
f"#{activity.issue_id}",
|
||||
activity.activity_type.value.title(),
|
||||
activity.activity_date.strftime('%Y-%m-%d') if activity.activity_date else 'N/A',
|
||||
(activity.activity_details[:50] + '...') if activity.activity_details and len(activity.activity_details) > 50 else (activity.activity_details or '')
|
||||
])
|
||||
|
||||
click.echo(tabulate(rows, headers=headers, tablefmt='grid'))
|
||||
click.echo(f"\n📈 Total: {len(activities)} activities")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error retrieving activities: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@activity.command()
|
||||
@click.option('--issue-id', type=int, help='Filter by specific issue ID')
|
||||
@click.option('--start-date', type=click.DateTime(formats=['%Y-%m-%d']),
|
||||
help='Start date for summary period')
|
||||
@click.option('--end-date', type=click.DateTime(formats=['%Y-%m-%d']),
|
||||
help='End date for summary period')
|
||||
def summary(issue_id: Optional[int], start_date: Optional[datetime],
|
||||
end_date: Optional[datetime]):
|
||||
"""Show activity summary statistics."""
|
||||
tracker = IssueActivityTracker()
|
||||
|
||||
try:
|
||||
summary_data = tracker.get_activity_summary(
|
||||
issue_id=issue_id,
|
||||
start_date=start_date.date() if start_date else None,
|
||||
end_date=end_date.date() if end_date else None
|
||||
)
|
||||
|
||||
click.echo("\n📊 Issue Activity Summary\n")
|
||||
|
||||
# Basic stats
|
||||
click.echo(f"Total Activities: {summary_data['total_activities']}")
|
||||
click.echo(f"Unique Issues: {summary_data['unique_issues']}")
|
||||
|
||||
# Date range
|
||||
date_range = summary_data['date_range']
|
||||
if date_range['start'] and date_range['end']:
|
||||
click.echo(f"Date Range: {date_range['start']} to {date_range['end']}")
|
||||
elif date_range['start']:
|
||||
click.echo(f"Since: {date_range['start']}")
|
||||
|
||||
# Activity breakdown
|
||||
if summary_data['activities_by_type']:
|
||||
click.echo("\nActivity Breakdown:")
|
||||
for activity_type, count in summary_data['activities_by_type'].items():
|
||||
percentage = (count / summary_data['total_activities']) * 100
|
||||
click.echo(f" {activity_type.title()}: {count} ({percentage:.1f}%)")
|
||||
|
||||
# Filters applied
|
||||
filters = summary_data['filters']
|
||||
applied_filters = []
|
||||
if filters['issue_id']:
|
||||
applied_filters.append(f"Issue #{filters['issue_id']}")
|
||||
if filters['start_date']:
|
||||
applied_filters.append(f"From {filters['start_date']}")
|
||||
if filters['end_date']:
|
||||
applied_filters.append(f"Until {filters['end_date']}")
|
||||
|
||||
if applied_filters:
|
||||
click.echo(f"\nFilters Applied: {', '.join(applied_filters)}")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error generating summary: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@activity.command()
|
||||
@click.argument('activity_id', type=int)
|
||||
@click.confirmation_option(prompt='Are you sure you want to delete this activity?')
|
||||
def delete(activity_id: int):
|
||||
"""Delete an activity record."""
|
||||
tracker = IssueActivityTracker()
|
||||
|
||||
try:
|
||||
if tracker.delete_activity(activity_id):
|
||||
click.echo(f"✅ Deleted activity #{activity_id}")
|
||||
else:
|
||||
click.echo(f"❌ Activity #{activity_id} not found")
|
||||
raise click.Abort()
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error deleting activity: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@activity.command()
|
||||
@click.argument('file_path', type=click.Path(exists=True))
|
||||
@click.option('--format', 'input_format', type=click.Choice(['json', 'csv']),
|
||||
default='json', help='Input file format')
|
||||
def import_activities(file_path: str, input_format: str):
|
||||
"""Import activities from a file."""
|
||||
tracker = IssueActivityTracker()
|
||||
|
||||
try:
|
||||
if input_format == 'json':
|
||||
import json
|
||||
with open(file_path, 'r') as f:
|
||||
activities = json.load(f)
|
||||
|
||||
elif input_format == 'csv':
|
||||
import csv
|
||||
activities = []
|
||||
with open(file_path, 'r') as f:
|
||||
reader = csv.DictReader(f)
|
||||
for row in reader:
|
||||
activity = {
|
||||
'issue_id': int(row['issue_id']),
|
||||
'activity_type': row['activity_type'],
|
||||
'activity_date': datetime.strptime(row['activity_date'], '%Y-%m-%d').date() if row.get('activity_date') else None,
|
||||
'activity_details': row.get('activity_details'),
|
||||
'period_id': int(row['period_id']) if row.get('period_id') else None
|
||||
}
|
||||
activities.append(activity)
|
||||
|
||||
activity_ids = tracker.bulk_log_activities(activities)
|
||||
click.echo(f"✅ Successfully imported {len(activity_ids)} activities")
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error importing activities: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
activity()
|
||||
@@ -1,417 +0,0 @@
|
||||
"""
|
||||
Issue Activity Tracking Service
|
||||
|
||||
This module provides comprehensive issue activity tracking functionality,
|
||||
building on the existing database infrastructure to log and retrieve
|
||||
issue activities for cost allocation and project management.
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
from datetime import datetime, date
|
||||
from typing import List, Dict, Any, Optional
|
||||
from dataclasses import dataclass, asdict
|
||||
from enum import Enum
|
||||
|
||||
from markitect.finance.models import FinanceModels
|
||||
|
||||
|
||||
class ActivityType(Enum):
|
||||
"""Enumeration of supported issue activity types."""
|
||||
CREATED = "created"
|
||||
MODIFIED = "modified"
|
||||
CLOSED = "closed"
|
||||
REOPENED = "reopened"
|
||||
COMMENTED = "commented"
|
||||
STATUS_CHANGED = "status_changed"
|
||||
|
||||
|
||||
@dataclass
|
||||
class IssueActivity:
|
||||
"""Data class representing an issue activity record with convenient methods."""
|
||||
id: Optional[int] = None
|
||||
issue_id: int = None
|
||||
activity_type: ActivityType = None
|
||||
activity_date: date = None
|
||||
period_id: Optional[int] = None
|
||||
activity_details: Optional[str] = None
|
||||
created_at: Optional[datetime] = None
|
||||
|
||||
@property
|
||||
def activity_type_value(self) -> str:
|
||||
"""Get the string value of the activity type."""
|
||||
return self.activity_type.value if self.activity_type else ''
|
||||
|
||||
@property
|
||||
def activity_type_display(self) -> str:
|
||||
"""Get the display-friendly activity type."""
|
||||
return self.activity_type_value.replace('_', ' ').title()
|
||||
|
||||
@property
|
||||
def formatted_date(self) -> str:
|
||||
"""Get formatted activity date string."""
|
||||
return self.activity_date.strftime('%Y-%m-%d') if self.activity_date else 'N/A'
|
||||
|
||||
@property
|
||||
def formatted_datetime(self) -> str:
|
||||
"""Get formatted created datetime string."""
|
||||
return self.created_at.strftime('%Y-%m-%d %H:%M') if self.created_at else 'N/A'
|
||||
|
||||
@property
|
||||
def truncated_details(self) -> str:
|
||||
"""Get truncated activity details for display (max 40 chars)."""
|
||||
if not self.activity_details:
|
||||
return ''
|
||||
return (self.activity_details[:40] + '...') if len(self.activity_details) > 40 else self.activity_details
|
||||
|
||||
def contains_keyword(self, keyword: str, case_sensitive: bool = False) -> bool:
|
||||
"""Check if activity contains a keyword in type or details."""
|
||||
search_text = f"{self.activity_type_value} {self.activity_details or ''}".strip()
|
||||
if not case_sensitive:
|
||||
search_text = search_text.lower()
|
||||
keyword = keyword.lower()
|
||||
return keyword in search_text
|
||||
|
||||
def has_implementation_activity(self) -> bool:
|
||||
"""Check if this activity indicates implementation work."""
|
||||
return (self.contains_keyword('implement') or
|
||||
self.contains_keyword('code') or
|
||||
self.contains_keyword('develop'))
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary representation."""
|
||||
return {
|
||||
'id': self.id,
|
||||
'issue_id': self.issue_id,
|
||||
'activity_type': self.activity_type_value,
|
||||
'activity_date': self.activity_date.isoformat() if self.activity_date else None,
|
||||
'period_id': self.period_id,
|
||||
'activity_details': self.activity_details,
|
||||
'created_at': self.created_at.isoformat() if self.created_at else None
|
||||
}
|
||||
|
||||
|
||||
class IssueActivityTracker:
|
||||
"""
|
||||
Service for tracking and managing issue activities.
|
||||
|
||||
Provides functionality to log issue activities, retrieve activity history,
|
||||
and generate activity reports for cost allocation and project management.
|
||||
"""
|
||||
|
||||
def __init__(self, db_path: str = "markitect.db"):
|
||||
"""
|
||||
Initialize the issue activity tracker.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database file
|
||||
"""
|
||||
self.db_path = db_path
|
||||
self.finance_models = FinanceModels(db_path)
|
||||
self._ensure_schema()
|
||||
|
||||
def _ensure_schema(self):
|
||||
"""Ensure the database schema is properly initialized."""
|
||||
self.finance_models.initialize_finance_schema()
|
||||
|
||||
def log_activity(
|
||||
self,
|
||||
issue_id: int,
|
||||
activity_type: ActivityType,
|
||||
activity_date: Optional[date] = None,
|
||||
activity_details: Optional[str] = None,
|
||||
period_id: Optional[int] = None
|
||||
) -> int:
|
||||
"""
|
||||
Log an issue activity.
|
||||
|
||||
Args:
|
||||
issue_id: ID of the issue
|
||||
activity_type: Type of activity performed
|
||||
activity_date: Date when activity occurred (defaults to today)
|
||||
activity_details: Additional details about the activity
|
||||
period_id: Optional period ID for cost allocation
|
||||
|
||||
Returns:
|
||||
ID of the created activity record
|
||||
|
||||
Raises:
|
||||
sqlite3.Error: If database operation fails
|
||||
"""
|
||||
if activity_date is None:
|
||||
activity_date = date.today()
|
||||
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# If period_id is not provided, try to get the current period
|
||||
if period_id is None:
|
||||
cursor.execute('''
|
||||
SELECT id FROM cost_periods
|
||||
WHERE ? BETWEEN period_start AND period_end
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
''', (activity_date.isoformat(),))
|
||||
result = cursor.fetchone()
|
||||
if result:
|
||||
period_id = result[0]
|
||||
|
||||
cursor.execute('''
|
||||
INSERT INTO issue_activity_log
|
||||
(issue_id, activity_type, activity_date, period_id, activity_details)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
''', (issue_id, activity_type.value, activity_date.isoformat(), period_id, activity_details))
|
||||
|
||||
return cursor.lastrowid
|
||||
|
||||
def get_issue_activities(
|
||||
self,
|
||||
issue_id: int,
|
||||
limit: Optional[int] = None,
|
||||
offset: int = 0
|
||||
) -> List[IssueActivity]:
|
||||
"""
|
||||
Get activities for a specific issue.
|
||||
|
||||
Args:
|
||||
issue_id: ID of the issue
|
||||
limit: Maximum number of activities to return
|
||||
offset: Number of activities to skip
|
||||
|
||||
Returns:
|
||||
List of issue activities, ordered by activity date (most recent first)
|
||||
"""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
query = '''
|
||||
SELECT id, issue_id, activity_type, activity_date,
|
||||
period_id, activity_details, created_at
|
||||
FROM issue_activity_log
|
||||
WHERE issue_id = ?
|
||||
ORDER BY activity_date DESC, created_at DESC
|
||||
'''
|
||||
params = [issue_id]
|
||||
|
||||
if limit:
|
||||
query += ' LIMIT ? OFFSET ?'
|
||||
params.extend([limit, offset])
|
||||
|
||||
cursor.execute(query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
activities = []
|
||||
for row in rows:
|
||||
activity = IssueActivity(
|
||||
id=row[0],
|
||||
issue_id=row[1],
|
||||
activity_type=ActivityType(row[2]),
|
||||
activity_date=datetime.strptime(row[3], '%Y-%m-%d').date() if row[3] else None,
|
||||
period_id=row[4],
|
||||
activity_details=row[5],
|
||||
created_at=datetime.fromisoformat(row[6]) if row[6] else None
|
||||
)
|
||||
activities.append(activity)
|
||||
|
||||
return activities
|
||||
|
||||
def get_activities_by_period(
|
||||
self,
|
||||
period_id: int,
|
||||
activity_types: Optional[List[ActivityType]] = None
|
||||
) -> List[IssueActivity]:
|
||||
"""
|
||||
Get all activities within a specific cost period.
|
||||
|
||||
Args:
|
||||
period_id: ID of the cost period
|
||||
activity_types: Optional list of activity types to filter by
|
||||
|
||||
Returns:
|
||||
List of issue activities within the period
|
||||
"""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
query = '''
|
||||
SELECT id, issue_id, activity_type, activity_date,
|
||||
period_id, activity_details, created_at
|
||||
FROM issue_activity_log
|
||||
WHERE period_id = ?
|
||||
'''
|
||||
params = [period_id]
|
||||
|
||||
if activity_types:
|
||||
placeholders = ','.join(['?' for _ in activity_types])
|
||||
query += f' AND activity_type IN ({placeholders})'
|
||||
params.extend([at.value for at in activity_types])
|
||||
|
||||
query += ' ORDER BY activity_date DESC, created_at DESC'
|
||||
|
||||
cursor.execute(query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
activities = []
|
||||
for row in rows:
|
||||
activity = IssueActivity(
|
||||
id=row[0],
|
||||
issue_id=row[1],
|
||||
activity_type=ActivityType(row[2]),
|
||||
activity_date=datetime.strptime(row[3], '%Y-%m-%d').date() if row[3] else None,
|
||||
period_id=row[4],
|
||||
activity_details=row[5],
|
||||
created_at=datetime.fromisoformat(row[6]) if row[6] else None
|
||||
)
|
||||
activities.append(activity)
|
||||
|
||||
return activities
|
||||
|
||||
def get_activity_summary(
|
||||
self,
|
||||
issue_id: Optional[int] = None,
|
||||
start_date: Optional[date] = None,
|
||||
end_date: Optional[date] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Get activity summary statistics.
|
||||
|
||||
Args:
|
||||
issue_id: Optional issue ID to filter by
|
||||
start_date: Optional start date filter
|
||||
end_date: Optional end date filter
|
||||
|
||||
Returns:
|
||||
Dictionary containing activity summary statistics
|
||||
"""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Build base query
|
||||
base_conditions = []
|
||||
params = []
|
||||
|
||||
if issue_id:
|
||||
base_conditions.append('issue_id = ?')
|
||||
params.append(issue_id)
|
||||
|
||||
if start_date:
|
||||
base_conditions.append('activity_date >= ?')
|
||||
params.append(start_date.isoformat() if hasattr(start_date, 'isoformat') else start_date)
|
||||
|
||||
if end_date:
|
||||
base_conditions.append('activity_date <= ?')
|
||||
params.append(end_date.isoformat() if hasattr(end_date, 'isoformat') else end_date)
|
||||
|
||||
where_clause = ' AND '.join(base_conditions) if base_conditions else '1=1'
|
||||
|
||||
# Get total activity count
|
||||
cursor.execute(f'''
|
||||
SELECT COUNT(*) FROM issue_activity_log WHERE {where_clause}
|
||||
''', params)
|
||||
total_activities = cursor.fetchone()[0]
|
||||
|
||||
# Get activity count by type
|
||||
cursor.execute(f'''
|
||||
SELECT activity_type, COUNT(*)
|
||||
FROM issue_activity_log
|
||||
WHERE {where_clause}
|
||||
GROUP BY activity_type
|
||||
ORDER BY COUNT(*) DESC
|
||||
''', params)
|
||||
activities_by_type = dict(cursor.fetchall())
|
||||
|
||||
# Get unique issues count
|
||||
cursor.execute(f'''
|
||||
SELECT COUNT(DISTINCT issue_id)
|
||||
FROM issue_activity_log
|
||||
WHERE {where_clause}
|
||||
''', params)
|
||||
unique_issues = cursor.fetchone()[0]
|
||||
|
||||
# Get date range
|
||||
cursor.execute(f'''
|
||||
SELECT MIN(activity_date), MAX(activity_date)
|
||||
FROM issue_activity_log
|
||||
WHERE {where_clause}
|
||||
''', params)
|
||||
date_range = cursor.fetchone()
|
||||
|
||||
return {
|
||||
'total_activities': total_activities,
|
||||
'activities_by_type': activities_by_type,
|
||||
'unique_issues': unique_issues,
|
||||
'date_range': {
|
||||
'start': date_range[0] if date_range[0] else None,
|
||||
'end': date_range[1] if date_range[1] else None
|
||||
},
|
||||
'filters': {
|
||||
'issue_id': issue_id,
|
||||
'start_date': start_date.isoformat() if start_date else None,
|
||||
'end_date': end_date.isoformat() if end_date else None
|
||||
}
|
||||
}
|
||||
|
||||
def delete_activity(self, activity_id: int) -> bool:
|
||||
"""
|
||||
Delete an activity record.
|
||||
|
||||
Args:
|
||||
activity_id: ID of the activity to delete
|
||||
|
||||
Returns:
|
||||
True if activity was deleted, False if not found
|
||||
"""
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
cursor.execute('DELETE FROM issue_activity_log WHERE id = ?', (activity_id,))
|
||||
return cursor.rowcount > 0
|
||||
|
||||
def bulk_log_activities(self, activities: List[Dict[str, Any]]) -> List[int]:
|
||||
"""
|
||||
Log multiple activities in a single transaction.
|
||||
|
||||
Args:
|
||||
activities: List of activity dictionaries with keys:
|
||||
issue_id, activity_type, activity_date (optional),
|
||||
activity_details (optional), period_id (optional)
|
||||
|
||||
Returns:
|
||||
List of created activity IDs
|
||||
|
||||
Raises:
|
||||
ValueError: If activity data is invalid
|
||||
sqlite3.Error: If database operation fails
|
||||
"""
|
||||
activity_ids = []
|
||||
|
||||
with self.finance_models.get_connection() as conn:
|
||||
cursor = conn.cursor()
|
||||
|
||||
for activity_data in activities:
|
||||
if 'issue_id' not in activity_data or 'activity_type' not in activity_data:
|
||||
raise ValueError("Each activity must have 'issue_id' and 'activity_type'")
|
||||
|
||||
issue_id = activity_data['issue_id']
|
||||
activity_type = ActivityType(activity_data['activity_type'])
|
||||
activity_date = activity_data.get('activity_date', date.today())
|
||||
activity_details = activity_data.get('activity_details')
|
||||
period_id = activity_data.get('period_id')
|
||||
|
||||
# Auto-determine period if not provided
|
||||
if period_id is None:
|
||||
cursor.execute('''
|
||||
SELECT id FROM cost_periods
|
||||
WHERE ? BETWEEN period_start AND period_end
|
||||
ORDER BY created_at DESC LIMIT 1
|
||||
''', (activity_date.isoformat() if hasattr(activity_date, 'isoformat') else activity_date,))
|
||||
result = cursor.fetchone()
|
||||
if result:
|
||||
period_id = result[0]
|
||||
|
||||
cursor.execute('''
|
||||
INSERT INTO issue_activity_log
|
||||
(issue_id, activity_type, activity_date, period_id, activity_details)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
''', (issue_id, activity_type.value, activity_date.isoformat() if hasattr(activity_date, 'isoformat') else activity_date, period_id, activity_details))
|
||||
|
||||
activity_ids.append(cursor.lastrowid)
|
||||
|
||||
return activity_ids
|
||||
@@ -1,109 +0,0 @@
|
||||
"""
|
||||
Abstract base class for issue management backends.
|
||||
|
||||
This module defines the interface that all issue management backends must implement.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import List, Optional, Dict, Any
|
||||
import sys
|
||||
from pathlib import Path
|
||||
# Add project root to path so domain module can be imported
|
||||
project_root = Path(__file__).parent.parent.parent
|
||||
if str(project_root) not in sys.path:
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
from domain.issues.models import Issue
|
||||
|
||||
|
||||
class IssueBackend(ABC):
|
||||
"""Abstract base class for issue management backends."""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""Initialize backend with configuration."""
|
||||
self.config = config
|
||||
|
||||
@abstractmethod
|
||||
def list_issues(self, state: Optional[str] = None) -> List[Issue]:
|
||||
"""
|
||||
List issues with optional state filter.
|
||||
|
||||
Args:
|
||||
state: Filter by state ('open', 'closed', 'all', or None for all)
|
||||
|
||||
Returns:
|
||||
List of Issue objects
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_issue(self, issue_id: str) -> Issue:
|
||||
"""
|
||||
Get specific issue by ID.
|
||||
|
||||
Args:
|
||||
issue_id: The issue identifier
|
||||
|
||||
Returns:
|
||||
Issue object
|
||||
|
||||
Raises:
|
||||
Exception: If issue not found
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_issue(self, title: str, body: str, **kwargs) -> Issue:
|
||||
"""
|
||||
Create new issue.
|
||||
|
||||
Args:
|
||||
title: Issue title
|
||||
body: Issue body/description
|
||||
**kwargs: Additional issue properties (labels, assignees, etc.)
|
||||
|
||||
Returns:
|
||||
Created Issue object
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def add_comment(self, issue_id: str, comment: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Add comment to issue.
|
||||
|
||||
Args:
|
||||
issue_id: The issue identifier
|
||||
comment: Comment text
|
||||
|
||||
Returns:
|
||||
Comment metadata (id, timestamp, etc.)
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close_issue(self, issue_id: str) -> Issue:
|
||||
"""
|
||||
Close issue.
|
||||
|
||||
Args:
|
||||
issue_id: The issue identifier
|
||||
|
||||
Returns:
|
||||
Updated Issue object with closed state
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update_issue(self, issue_id: str, **kwargs) -> Issue:
|
||||
"""
|
||||
Update issue properties.
|
||||
|
||||
Args:
|
||||
issue_id: The issue identifier
|
||||
**kwargs: Properties to update (title, body, state, etc.)
|
||||
|
||||
Returns:
|
||||
Updated Issue object
|
||||
"""
|
||||
pass
|
||||
@@ -1,165 +0,0 @@
|
||||
"""
|
||||
CLI commands for issue management.
|
||||
|
||||
This module provides Click commands for the unified issue management interface.
|
||||
"""
|
||||
|
||||
import click
|
||||
from typing import Optional
|
||||
|
||||
from .manager import IssuePluginManager
|
||||
from .exceptions import PluginNotFoundError, ConfigurationError
|
||||
from cli.presenters.views import IssueView
|
||||
|
||||
|
||||
@click.group()
|
||||
def issues():
|
||||
"""Issue management with multiple backend support."""
|
||||
pass
|
||||
|
||||
|
||||
@issues.command()
|
||||
@click.option('--state', type=click.Choice(['open', 'closed', 'all']), default='all',
|
||||
help='Filter issues by state')
|
||||
@click.option('--backend', help='Override configured backend')
|
||||
def list(state: str, backend: Optional[str]):
|
||||
"""List issues from configured backend."""
|
||||
try:
|
||||
manager = IssuePluginManager()
|
||||
backend_instance = manager.get_backend(backend)
|
||||
issues_list = backend_instance.list_issues(state=state)
|
||||
|
||||
if state == 'open':
|
||||
IssueView.show_open_issues(issues_list)
|
||||
else:
|
||||
IssueView.show_list(issues_list, f"Issues ({state})")
|
||||
|
||||
except (PluginNotFoundError, ConfigurationError) as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
except Exception as e:
|
||||
click.echo(f"Unexpected error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@issues.command()
|
||||
@click.argument('issue_id')
|
||||
@click.option('--backend', help='Override configured backend')
|
||||
def show(issue_id: str, backend: Optional[str]):
|
||||
"""Show details of a specific issue."""
|
||||
try:
|
||||
manager = IssuePluginManager()
|
||||
backend_instance = manager.get_backend(backend)
|
||||
issue = backend_instance.get_issue(issue_id)
|
||||
|
||||
# Convert issue to dict for display
|
||||
issue_data = {
|
||||
'number': issue.number,
|
||||
'title': issue.title,
|
||||
'body': getattr(issue, '_body', ''),
|
||||
'state': issue.state.value if hasattr(issue.state, 'value') else str(issue.state),
|
||||
'created_at': issue.created_at,
|
||||
'updated_at': getattr(issue, 'updated_at', issue.created_at),
|
||||
'labels': [label.name if hasattr(label, 'name') else str(label) for label in issue.labels],
|
||||
'assignees': getattr(issue, 'assignees', []) or [],
|
||||
'assignee': getattr(issue, 'assignee', None),
|
||||
'milestone': getattr(issue, 'milestone', None),
|
||||
'html_url': getattr(issue, 'html_url', ''),
|
||||
'state_label': getattr(issue, 'state_label', issue.state.value if hasattr(issue.state, 'value') else str(issue.state)),
|
||||
'priority_label': getattr(issue, 'priority_label', 'Normal'),
|
||||
'type_labels': getattr(issue, 'type_labels', []),
|
||||
'other_labels': getattr(issue, 'other_labels', []),
|
||||
'kanban_column': getattr(issue, 'kanban_column', 'To Do')
|
||||
}
|
||||
|
||||
IssueView.show_issue_details(issue_data)
|
||||
|
||||
except FileNotFoundError:
|
||||
click.echo(f"Error: Issue {issue_id} not found", err=True)
|
||||
raise click.Abort()
|
||||
except (PluginNotFoundError, ConfigurationError) as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
except Exception as e:
|
||||
click.echo(f"Unexpected error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@issues.command()
|
||||
@click.argument('title')
|
||||
@click.argument('body')
|
||||
@click.option('--backend', help='Override configured backend')
|
||||
def create(title: str, body: str, backend: Optional[str]):
|
||||
"""Create a new issue."""
|
||||
try:
|
||||
manager = IssuePluginManager()
|
||||
backend_instance = manager.get_backend(backend)
|
||||
issue = backend_instance.create_issue(title, body)
|
||||
|
||||
# Convert issue to dict for display
|
||||
result = {
|
||||
'number': issue.number,
|
||||
'title': issue.title,
|
||||
'state': issue.state.value if hasattr(issue.state, 'value') else str(issue.state)
|
||||
}
|
||||
|
||||
IssueView.show_creation_success(result, "issue")
|
||||
click.echo(f"Issue #{issue.number} created successfully")
|
||||
|
||||
except (PluginNotFoundError, ConfigurationError) as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
except Exception as e:
|
||||
click.echo(f"Unexpected error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@issues.command()
|
||||
@click.argument('issue_id')
|
||||
@click.argument('comment')
|
||||
@click.option('--backend', help='Override configured backend')
|
||||
def comment(issue_id: str, comment: str, backend: Optional[str]):
|
||||
"""Add a comment to an issue."""
|
||||
try:
|
||||
manager = IssuePluginManager()
|
||||
backend_instance = manager.get_backend(backend)
|
||||
result = backend_instance.add_comment(issue_id, comment)
|
||||
|
||||
click.echo(f"Comment added to issue #{issue_id}")
|
||||
|
||||
except ValueError as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
except (PluginNotFoundError, ConfigurationError) as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
except Exception as e:
|
||||
click.echo(f"Unexpected error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
@issues.command()
|
||||
@click.argument('issue_id')
|
||||
@click.option('--backend', help='Override configured backend')
|
||||
def close(issue_id: str, backend: Optional[str]):
|
||||
"""Close an issue."""
|
||||
try:
|
||||
manager = IssuePluginManager()
|
||||
backend_instance = manager.get_backend(backend)
|
||||
issue = backend_instance.close_issue(issue_id)
|
||||
|
||||
click.echo(f"Issue #{issue_id} closed successfully")
|
||||
|
||||
except FileNotFoundError:
|
||||
click.echo(f"Error: Issue {issue_id} not found", err=True)
|
||||
raise click.Abort()
|
||||
except (PluginNotFoundError, ConfigurationError) as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
except Exception as e:
|
||||
click.echo(f"Unexpected error: {e}", err=True)
|
||||
raise click.Abort()
|
||||
|
||||
|
||||
# Make issues_group available for import
|
||||
issues_group = issues
|
||||
@@ -1,18 +0,0 @@
|
||||
"""
|
||||
Exceptions for the issue management module.
|
||||
"""
|
||||
|
||||
|
||||
class IssuePluginError(Exception):
|
||||
"""Base exception for issue plugin errors."""
|
||||
pass
|
||||
|
||||
|
||||
class PluginNotFoundError(IssuePluginError):
|
||||
"""Raised when a requested plugin is not found."""
|
||||
pass
|
||||
|
||||
|
||||
class ConfigurationError(IssuePluginError):
|
||||
"""Raised when there's a configuration error."""
|
||||
pass
|
||||
@@ -1,600 +0,0 @@
|
||||
"""
|
||||
Single Command Issue Wrap-Up functionality.
|
||||
|
||||
This module provides comprehensive issue completion automation including:
|
||||
- Requirement validation and verification
|
||||
- Test execution and validation
|
||||
- Cost note creation and database updates
|
||||
- Git operations (add, commit with cost notes)
|
||||
- Comprehensive completion summary
|
||||
|
||||
The system automates the entire issue closure workflow in a single command.
|
||||
"""
|
||||
|
||||
import click
|
||||
import subprocess
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime, date
|
||||
from typing import Optional, Dict, Any, List
|
||||
from decimal import Decimal
|
||||
from pathlib import Path
|
||||
from tabulate import tabulate
|
||||
|
||||
from ..finance.worktime_tracker import WorktimeTracker
|
||||
from ..finance.session_tracker import SessionCostTracker
|
||||
from ..finance.cost_manager import CostItemManager
|
||||
from .activity_tracker import IssueActivityTracker
|
||||
from .manager import IssuePluginManager
|
||||
|
||||
|
||||
class IssueWrapUpService:
|
||||
"""Service for comprehensive issue wrap-up functionality."""
|
||||
|
||||
def __init__(self, db_path: str = "markitect.db"):
|
||||
"""Initialize the issue wrap-up service."""
|
||||
self.db_path = db_path
|
||||
self.worktime_tracker = WorktimeTracker(db_path)
|
||||
self.activity_tracker = IssueActivityTracker(db_path)
|
||||
self.session_tracker = SessionCostTracker(db_path)
|
||||
self.cost_manager = CostItemManager(db_path)
|
||||
self.issue_manager = IssuePluginManager()
|
||||
|
||||
def wrap_up_issue(self, issue_number: int, force: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Perform comprehensive issue wrap-up.
|
||||
|
||||
Args:
|
||||
issue_number: Issue number to wrap up
|
||||
force: Skip validation checks if True
|
||||
|
||||
Returns:
|
||||
Dictionary containing wrap-up results
|
||||
"""
|
||||
wrap_up_results = {
|
||||
'issue_number': issue_number,
|
||||
'timestamp': datetime.now(),
|
||||
'steps': {}
|
||||
}
|
||||
|
||||
# Step 1: Get issue details
|
||||
click.echo(f"🔍 Retrieving issue #{issue_number} details...")
|
||||
issue_details = self._get_issue_details(issue_number)
|
||||
wrap_up_results['issue_details'] = issue_details
|
||||
wrap_up_results['steps']['issue_retrieval'] = {'success': bool(issue_details)}
|
||||
|
||||
if not issue_details and not force:
|
||||
wrap_up_results['steps']['issue_retrieval']['error'] = "Issue not found"
|
||||
return wrap_up_results
|
||||
|
||||
# Step 2: Review requirements (placeholder - would need issue analysis)
|
||||
click.echo("📋 Reviewing requirements...")
|
||||
req_check = self._review_requirements(issue_number, issue_details, force)
|
||||
wrap_up_results['steps']['requirement_review'] = req_check
|
||||
|
||||
# Step 3: Run associated tests
|
||||
click.echo("🧪 Running associated tests...")
|
||||
test_results = self._run_issue_tests(issue_number, force)
|
||||
wrap_up_results['steps']['test_execution'] = test_results
|
||||
|
||||
# Step 4: Run full test suite
|
||||
click.echo("🔬 Running full test suite...")
|
||||
full_test_results = self._run_full_tests(force)
|
||||
wrap_up_results['steps']['full_test_execution'] = full_test_results
|
||||
|
||||
# Step 5: Calculate and update costs
|
||||
click.echo("💰 Calculating and updating costs...")
|
||||
cost_results = self._update_cost_tracking(issue_number, issue_details)
|
||||
wrap_up_results['steps']['cost_tracking'] = cost_results
|
||||
|
||||
# Step 6: Create/update cost note
|
||||
click.echo("📄 Creating/updating cost note...")
|
||||
cost_note_results = self._create_cost_note(issue_number, issue_details, cost_results)
|
||||
wrap_up_results['steps']['cost_note'] = cost_note_results
|
||||
|
||||
# Step 7: Git operations
|
||||
click.echo("📦 Adding and committing changes...")
|
||||
git_results = self._git_operations(issue_number, issue_details)
|
||||
wrap_up_results['steps']['git_operations'] = git_results
|
||||
|
||||
# Step 8: Close issue
|
||||
click.echo("🔒 Closing issue...")
|
||||
closure_results = self._close_issue(issue_number)
|
||||
wrap_up_results['steps']['issue_closure'] = closure_results
|
||||
|
||||
return wrap_up_results
|
||||
|
||||
def _get_issue_details(self, issue_number: int) -> Optional[Dict[str, Any]]:
|
||||
"""Retrieve issue details from the backend."""
|
||||
try:
|
||||
backend = self.issue_manager.get_backend()
|
||||
# This would call the actual backend API
|
||||
# For now, simulate with basic info
|
||||
return {
|
||||
'number': issue_number,
|
||||
'title': f"Issue #{issue_number}",
|
||||
'status': 'open',
|
||||
'description': 'Issue description would be retrieved from backend'
|
||||
}
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
def _review_requirements(self, issue_number: int, issue_details: Optional[Dict], force: bool) -> Dict[str, Any]:
|
||||
"""Review that requirements have been met."""
|
||||
if force:
|
||||
return {'success': True, 'forced': True}
|
||||
|
||||
# This would implement actual requirement checking logic
|
||||
# For now, check if there are recent activities
|
||||
activities = self.activity_tracker.get_issue_activities(
|
||||
issue_id=issue_number,
|
||||
limit=10
|
||||
)
|
||||
|
||||
has_implementation = any(
|
||||
activity.has_implementation_activity()
|
||||
for activity in activities
|
||||
)
|
||||
|
||||
return {
|
||||
'success': has_implementation or len(activities) > 0,
|
||||
'activities_count': len(activities),
|
||||
'has_implementation_activity': has_implementation
|
||||
}
|
||||
|
||||
def _run_issue_tests(self, issue_number: int, force: bool) -> Dict[str, Any]:
|
||||
"""Run tests associated with the issue."""
|
||||
test_files = [
|
||||
f"tests/test_issue_{issue_number}_*.py",
|
||||
f"tests/test_issue_{issue_number}.py"
|
||||
]
|
||||
|
||||
results = {
|
||||
'success': True,
|
||||
'test_files': [],
|
||||
'output': []
|
||||
}
|
||||
|
||||
for test_pattern in test_files:
|
||||
# Check if test files exist
|
||||
test_files_found = list(Path('.').glob(test_pattern))
|
||||
|
||||
for test_file in test_files_found:
|
||||
results['test_files'].append(str(test_file))
|
||||
|
||||
try:
|
||||
if force:
|
||||
results['output'].append(f"FORCED: Skipping test execution for {test_file}")
|
||||
continue
|
||||
|
||||
# Run the specific test
|
||||
cmd = ['.venv/bin/python', '-m', 'pytest', str(test_file), '-v']
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd='.')
|
||||
|
||||
results['output'].append({
|
||||
'file': str(test_file),
|
||||
'returncode': result.returncode,
|
||||
'stdout': result.stdout,
|
||||
'stderr': result.stderr
|
||||
})
|
||||
|
||||
if result.returncode != 0:
|
||||
results['success'] = False
|
||||
|
||||
except Exception as e:
|
||||
results['success'] = False
|
||||
results['output'].append({
|
||||
'file': str(test_file),
|
||||
'error': str(e)
|
||||
})
|
||||
|
||||
if not results['test_files']:
|
||||
results['output'].append(f"No specific test files found for issue #{issue_number}")
|
||||
|
||||
return results
|
||||
|
||||
def _run_full_tests(self, force: bool) -> Dict[str, Any]:
|
||||
"""Run the full test suite to ensure no regressions."""
|
||||
if force:
|
||||
return {
|
||||
'success': True,
|
||||
'forced': True,
|
||||
'output': 'FORCED: Skipped full test suite execution'
|
||||
}
|
||||
|
||||
try:
|
||||
# Try to determine the test command from Makefile or common patterns
|
||||
test_commands = [
|
||||
['make', 'test'],
|
||||
['.venv/bin/python', '-m', 'pytest', '-v'],
|
||||
['python', '-m', 'pytest', '-v'],
|
||||
['pytest', '-v']
|
||||
]
|
||||
|
||||
for cmd in test_commands:
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, cwd='.', timeout=300)
|
||||
return {
|
||||
'success': result.returncode == 0,
|
||||
'command': ' '.join(cmd),
|
||||
'returncode': result.returncode,
|
||||
'stdout': result.stdout,
|
||||
'stderr': result.stderr
|
||||
}
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
continue
|
||||
|
||||
return {
|
||||
'success': False,
|
||||
'error': 'No suitable test command found'
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def _update_cost_tracking(self, issue_number: int, issue_details: Optional[Dict]) -> Dict[str, Any]:
|
||||
"""Calculate and register time and cost data in database."""
|
||||
try:
|
||||
# Get activity data
|
||||
activities = self.activity_tracker.get_issue_activities(issue_id=issue_number)
|
||||
|
||||
# Get session cost data - method may not exist
|
||||
session_costs = []
|
||||
try:
|
||||
if hasattr(self.session_tracker, 'get_issue_costs'):
|
||||
session_costs = self.session_tracker.get_issue_costs(issue_number)
|
||||
elif hasattr(self.session_tracker, 'get_costs_for_issue'):
|
||||
session_costs = self.session_tracker.get_costs_for_issue(issue_number)
|
||||
except Exception:
|
||||
# If session cost tracking fails, continue with empty list
|
||||
session_costs = []
|
||||
|
||||
# Try to get worktime data - method name may vary
|
||||
total_minutes = 0
|
||||
try:
|
||||
# Try different possible methods for getting worktime data
|
||||
if hasattr(self.worktime_tracker, 'get_issue_summary'):
|
||||
worktime_summary = self.worktime_tracker.get_issue_summary(issue_number)
|
||||
total_minutes = worktime_summary.get('total_minutes', 0) if worktime_summary else 0
|
||||
elif hasattr(self.worktime_tracker, 'get_issue_worktime'):
|
||||
worktime_data = self.worktime_tracker.get_issue_worktime(issue_number)
|
||||
total_minutes = worktime_data.get('total_minutes', 0) if worktime_data else 0
|
||||
# If no specific method available, try to calculate from entries
|
||||
elif hasattr(self.worktime_tracker, 'get_entries'):
|
||||
entries = self.worktime_tracker.get_entries()
|
||||
total_minutes = sum(
|
||||
entry.duration_minutes for entry in entries
|
||||
if hasattr(entry, 'issue_id') and entry.issue_id == issue_number
|
||||
)
|
||||
except Exception:
|
||||
# If worktime tracking fails, continue with 0
|
||||
total_minutes = 0
|
||||
|
||||
# Calculate totals
|
||||
total_cost = sum(cost.get('cost_eur', 0) for cost in session_costs)
|
||||
|
||||
cost_data = {
|
||||
'issue_number': issue_number,
|
||||
'total_minutes': total_minutes,
|
||||
'total_hours': total_minutes / 60 if total_minutes else 0,
|
||||
'total_cost_eur': total_cost,
|
||||
'activity_count': len(activities),
|
||||
'session_count': len(session_costs)
|
||||
}
|
||||
|
||||
# This would register in a centralized cost tracking system
|
||||
# For now, just return the calculated data
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'cost_data': cost_data
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def _create_cost_note(self, issue_number: int, issue_details: Optional[Dict], cost_results: Dict) -> Dict[str, Any]:
|
||||
"""Create or update cost note for the issue."""
|
||||
try:
|
||||
cost_data = cost_results.get('cost_data', {})
|
||||
|
||||
# Create cost note content
|
||||
cost_note_content = self._generate_cost_note_content(
|
||||
issue_number, issue_details, cost_data
|
||||
)
|
||||
|
||||
# Write cost note file
|
||||
cost_note_path = Path(f"cost_notes/issue_{issue_number}_cost_{date.today().isoformat()}.md")
|
||||
cost_note_path.parent.mkdir(exist_ok=True)
|
||||
|
||||
with open(cost_note_path, 'w') as f:
|
||||
f.write(cost_note_content)
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'cost_note_path': str(cost_note_path)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def _generate_cost_note_content(self, issue_number: int, issue_details: Optional[Dict], cost_data: Dict) -> str:
|
||||
"""Generate cost note content."""
|
||||
title = issue_details.get('title', f'Issue #{issue_number}') if issue_details else f'Issue #{issue_number}'
|
||||
|
||||
total_cost_eur = cost_data.get('total_cost_eur', 0)
|
||||
total_cost_usd = total_cost_eur / 0.92 if total_cost_eur else 0 # Approximate conversion
|
||||
|
||||
content = f"""---
|
||||
note_type: "issue_cost_tracking"
|
||||
issue_id: {issue_number}
|
||||
issue_title: "{title}"
|
||||
session_date: "{date.today().isoformat()}"
|
||||
claude_model: "claude-sonnet-4"
|
||||
total_cost_eur: {total_cost_eur:.4f}
|
||||
total_cost_usd: {total_cost_usd:.3f}
|
||||
total_minutes: {cost_data.get('total_minutes', 0)}
|
||||
implementation_time_minutes: {cost_data.get('total_minutes', 0)}
|
||||
generated_at: "{datetime.now().isoformat()}"
|
||||
---
|
||||
|
||||
# Issue #{issue_number} Implementation Cost
|
||||
**Issue**: {title}
|
||||
**Date**: {date.today().isoformat()}
|
||||
**Claude Model**: claude-sonnet-4
|
||||
|
||||
## Cost Summary
|
||||
- **Total Cost**: €{total_cost_eur:.4f} (${total_cost_usd:.4f} USD)
|
||||
- **Implementation Time**: {cost_data.get('total_hours', 0):.1f} hours ({cost_data.get('total_minutes', 0)} minutes)
|
||||
- **Activities Tracked**: {cost_data.get('activity_count', 0)} activities
|
||||
- **Sessions**: {cost_data.get('session_count', 0)} cost sessions
|
||||
|
||||
## Implementation Summary
|
||||
Issue #{issue_number} "{title}" has been completed and wrapped up through automated process.
|
||||
|
||||
## Cost Allocation
|
||||
This cost has been allocated to issue #{issue_number} implementation.
|
||||
|
||||
## Notes
|
||||
- Currency conversion rate: 1 USD = 0.920 EUR
|
||||
- Pricing based on claude-sonnet-4 rates as of {date.today().isoformat()}
|
||||
- Implementation time includes design, coding, testing, and validation
|
||||
|
||||
<!--
|
||||
contentmatter:
|
||||
{{
|
||||
"cost_tracking": {{
|
||||
"issue": {{
|
||||
"id": {issue_number},
|
||||
"title": "{title}",
|
||||
"completion_date": "{date.today().isoformat()}",
|
||||
"implementation_time_minutes": {cost_data.get('total_minutes', 0)},
|
||||
"status": "completed"
|
||||
}},
|
||||
"costs": {{
|
||||
"total_cost_usd": {total_cost_usd:.4f},
|
||||
"total_cost_eur": {total_cost_eur:.4f},
|
||||
"conversion_rate": 0.92
|
||||
}},
|
||||
"tracking": {{
|
||||
"activity_count": {cost_data.get('activity_count', 0)},
|
||||
"session_count": {cost_data.get('session_count', 0)}
|
||||
}}
|
||||
}}
|
||||
}}
|
||||
-->
|
||||
"""
|
||||
return content
|
||||
|
||||
def _git_operations(self, issue_number: int, issue_details: Optional[Dict]) -> Dict[str, Any]:
|
||||
"""Perform git add and commit operations."""
|
||||
try:
|
||||
# Add all changes including cost notes
|
||||
result_add = subprocess.run(['git', 'add', '.'], capture_output=True, text=True)
|
||||
|
||||
if result_add.returncode != 0:
|
||||
return {
|
||||
'success': False,
|
||||
'error': f'Git add failed: {result_add.stderr}'
|
||||
}
|
||||
|
||||
# Create commit message
|
||||
title = issue_details.get('title', f'Issue #{issue_number}') if issue_details else f'Issue #{issue_number}'
|
||||
commit_message = f"""feat: complete issue #{issue_number} - {title}
|
||||
|
||||
Automated issue wrap-up including:
|
||||
- Implementation completion verification
|
||||
- Test execution and validation
|
||||
- Cost tracking and note generation
|
||||
- Repository state commit
|
||||
|
||||
🤖 Generated with [Claude Code](https://claude.ai/code)
|
||||
|
||||
Co-Authored-By: Claude <noreply@anthropic.com>"""
|
||||
|
||||
# Commit changes
|
||||
result_commit = subprocess.run(
|
||||
['git', 'commit', '-m', commit_message],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
|
||||
return {
|
||||
'success': result_commit.returncode == 0,
|
||||
'add_output': result_add.stdout,
|
||||
'commit_output': result_commit.stdout,
|
||||
'commit_error': result_commit.stderr if result_commit.returncode != 0 else None
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def _close_issue(self, issue_number: int) -> Dict[str, Any]:
|
||||
"""Close the issue using the issue management system."""
|
||||
try:
|
||||
# Log closing activity
|
||||
self.activity_tracker.log_activity(
|
||||
issue_id=issue_number,
|
||||
activity_type="close",
|
||||
description=f"Issue #{issue_number} completed via automated wrap-up process"
|
||||
)
|
||||
|
||||
# Try to close via make command (most reliable method)
|
||||
try:
|
||||
result = subprocess.run(
|
||||
['make', 'close-issue', f'NUM={issue_number}'],
|
||||
capture_output=True, text=True, cwd='.'
|
||||
)
|
||||
|
||||
return {
|
||||
'success': result.returncode == 0,
|
||||
'method': 'make',
|
||||
'output': result.stdout,
|
||||
'error': result.stderr if result.returncode != 0 else None
|
||||
}
|
||||
|
||||
except Exception:
|
||||
# Fallback to direct backend call
|
||||
try:
|
||||
backend = self.issue_manager.get_backend()
|
||||
# This would call backend.close_issue(issue_number)
|
||||
return {
|
||||
'success': False,
|
||||
'method': 'backend',
|
||||
'error': 'Backend closure not implemented'
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'method': 'backend',
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': str(e)
|
||||
}
|
||||
|
||||
def format_summary(self, results: Dict[str, Any]) -> str:
|
||||
"""Format wrap-up results as a readable summary."""
|
||||
issue_num = results['issue_number']
|
||||
timestamp = results['timestamp'].strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
summary = [
|
||||
f"\n🎉 Issue #{issue_num} Wrap-Up Complete",
|
||||
f"📅 Completed: {timestamp}",
|
||||
"=" * 50
|
||||
]
|
||||
|
||||
# Step-by-step results
|
||||
steps = results.get('steps', {})
|
||||
step_names = {
|
||||
'issue_retrieval': '🔍 Issue Retrieval',
|
||||
'requirement_review': '📋 Requirement Review',
|
||||
'test_execution': '🧪 Associated Tests',
|
||||
'full_test_execution': '🔬 Full Test Suite',
|
||||
'cost_tracking': '💰 Cost Tracking',
|
||||
'cost_note': '📄 Cost Note',
|
||||
'git_operations': '📦 Git Operations',
|
||||
'issue_closure': '🔒 Issue Closure'
|
||||
}
|
||||
|
||||
for step_key, step_name in step_names.items():
|
||||
if step_key in steps:
|
||||
step_result = steps[step_key]
|
||||
success = step_result.get('success', False)
|
||||
status = "✅ SUCCESS" if success else "❌ FAILED"
|
||||
summary.append(f"{step_name}: {status}")
|
||||
|
||||
if not success and 'error' in step_result:
|
||||
summary.append(f" Error: {step_result['error']}")
|
||||
|
||||
# Cost information
|
||||
if 'cost_tracking' in steps and steps['cost_tracking'].get('success'):
|
||||
cost_data = steps['cost_tracking'].get('cost_data', {})
|
||||
if cost_data:
|
||||
summary.extend([
|
||||
"",
|
||||
"💰 Cost Summary:",
|
||||
f" Time: {cost_data.get('total_hours', 0):.1f} hours",
|
||||
f" Cost: €{cost_data.get('total_cost_eur', 0):.4f}",
|
||||
f" Activities: {cost_data.get('activity_count', 0)}"
|
||||
])
|
||||
|
||||
# Overall status
|
||||
all_critical_success = all(
|
||||
steps.get(step, {}).get('success', False)
|
||||
for step in ['test_execution', 'full_test_execution', 'git_operations']
|
||||
)
|
||||
|
||||
summary.extend([
|
||||
"",
|
||||
"🎯 Overall Status:",
|
||||
"✅ SUCCESS - Issue wrap-up completed successfully!" if all_critical_success
|
||||
else "⚠️ PARTIAL - Some steps had issues, please review above"
|
||||
])
|
||||
|
||||
return "\n".join(summary)
|
||||
|
||||
|
||||
@click.group()
|
||||
def issue_wrapup():
|
||||
"""Issue wrap-up commands for comprehensive issue completion."""
|
||||
pass
|
||||
|
||||
|
||||
@issue_wrapup.command()
|
||||
@click.argument('issue_number', type=int)
|
||||
@click.option('--force', is_flag=True, help='Skip validation checks and force completion')
|
||||
@click.option('--format', 'output_format', type=click.Choice(['summary', 'detailed', 'json']),
|
||||
default='summary', help='Output format')
|
||||
def complete(issue_number: int, force: bool, output_format: str):
|
||||
"""Complete comprehensive wrap-up for an issue.
|
||||
|
||||
Performs all steps needed to properly close an issue:
|
||||
- Verifies requirements have been met
|
||||
- Runs associated tests and full test suite
|
||||
- Calculates and updates cost tracking
|
||||
- Creates/updates cost notes
|
||||
- Commits changes to repository
|
||||
- Closes the issue
|
||||
- Provides completion summary
|
||||
"""
|
||||
service = IssueWrapUpService()
|
||||
|
||||
try:
|
||||
results = service.wrap_up_issue(issue_number, force=force)
|
||||
|
||||
if output_format == 'json':
|
||||
# Convert datetime objects to strings for JSON serialization
|
||||
json_results = json.loads(json.dumps(results, default=str))
|
||||
click.echo(json.dumps(json_results, indent=2))
|
||||
elif output_format == 'detailed':
|
||||
click.echo(service.format_summary(results))
|
||||
# Add detailed step information
|
||||
for step_name, step_data in results.get('steps', {}).items():
|
||||
if 'output' in step_data:
|
||||
click.echo(f"\n--- {step_name.title()} Details ---")
|
||||
click.echo(json.dumps(step_data['output'], indent=2, default=str))
|
||||
else: # summary
|
||||
click.echo(service.format_summary(results))
|
||||
|
||||
except Exception as e:
|
||||
click.echo(f"❌ Error during issue wrap-up: {str(e)}", err=True)
|
||||
raise click.ClickException(f"Issue wrap-up failed: {str(e)}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
issue_wrapup()
|
||||
@@ -1,151 +0,0 @@
|
||||
"""
|
||||
Plugin manager for issue backends.
|
||||
|
||||
This module handles discovery, loading, and configuration of issue management backends.
|
||||
"""
|
||||
|
||||
import importlib
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, Type, Optional
|
||||
|
||||
from .base import IssueBackend
|
||||
from .exceptions import PluginNotFoundError, ConfigurationError
|
||||
|
||||
|
||||
class IssuePluginManager:
|
||||
"""Manages issue backend plugins and configuration."""
|
||||
|
||||
def __init__(self, config_path: Optional[str] = None):
|
||||
"""
|
||||
Initialize plugin manager.
|
||||
|
||||
Args:
|
||||
config_path: Optional path to configuration file
|
||||
"""
|
||||
self.config = self._load_config(config_path)
|
||||
self.plugins = self._discover_plugins()
|
||||
|
||||
def get_backend(self, backend_name: Optional[str] = None) -> IssueBackend:
|
||||
"""
|
||||
Get configured backend instance.
|
||||
|
||||
Args:
|
||||
backend_name: Backend name to use, or None for default
|
||||
|
||||
Returns:
|
||||
IssueBackend instance
|
||||
|
||||
Raises:
|
||||
PluginNotFoundError: If backend not found
|
||||
"""
|
||||
backend_name = backend_name or self.config.get('default_backend', 'gitea')
|
||||
|
||||
plugin_class = self.plugins.get(backend_name)
|
||||
if not plugin_class:
|
||||
raise PluginNotFoundError(f"Unknown backend: {backend_name}")
|
||||
|
||||
backend_config = self.config.get('backends', {}).get(backend_name, {})
|
||||
return plugin_class(backend_config)
|
||||
|
||||
def _load_config(self, config_path: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
Load configuration from file or return defaults.
|
||||
|
||||
Args:
|
||||
config_path: Path to configuration file
|
||||
|
||||
Returns:
|
||||
Configuration dictionary
|
||||
"""
|
||||
from config.manager import UnifiedConfigManager
|
||||
|
||||
# Get main markitect configuration to extract API token and settings
|
||||
try:
|
||||
config_manager = UnifiedConfigManager()
|
||||
markitect_config = config_manager.get_config()
|
||||
main_config = markitect_config.__dict__ if hasattr(markitect_config, '__dict__') else {}
|
||||
except Exception:
|
||||
main_config = {}
|
||||
|
||||
if config_path is None:
|
||||
config_path = Path('.markitect/config/issues.yml')
|
||||
else:
|
||||
config_path = Path(config_path)
|
||||
|
||||
# Extract configuration from main markitect config
|
||||
gitea_url = main_config.get('gitea_url', 'http://92.205.130.254:32166')
|
||||
api_token = main_config.get('api_token', '')
|
||||
repo_owner = main_config.get('repo_owner', 'coulomb')
|
||||
repo_name = main_config.get('repo_name', 'markitect_project')
|
||||
|
||||
# Default configuration using main config values
|
||||
default_config = {
|
||||
'default_backend': 'gitea',
|
||||
'backends': {
|
||||
'gitea': {
|
||||
'url': gitea_url,
|
||||
'token': api_token,
|
||||
'repo_owner': repo_owner,
|
||||
'repo_name': repo_name,
|
||||
'repo': f'{repo_owner}/{repo_name}'
|
||||
},
|
||||
'local': {
|
||||
'directory': '.markitect/issues',
|
||||
'auto_git': True
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if not config_path.exists():
|
||||
return default_config
|
||||
|
||||
try:
|
||||
with open(config_path, 'r') as f:
|
||||
config = yaml.safe_load(f) or {}
|
||||
|
||||
# Merge with defaults
|
||||
merged_config = default_config.copy()
|
||||
merged_config.update(config)
|
||||
|
||||
# Ensure gitea backend has token from main config if not overridden
|
||||
if 'backends' in merged_config and 'gitea' in merged_config['backends']:
|
||||
gitea_config = merged_config['backends']['gitea']
|
||||
if not gitea_config.get('token'):
|
||||
gitea_config['token'] = api_token
|
||||
if not gitea_config.get('repo_owner'):
|
||||
gitea_config['repo_owner'] = repo_owner
|
||||
if not gitea_config.get('repo_name'):
|
||||
gitea_config['repo_name'] = repo_name
|
||||
|
||||
return merged_config
|
||||
except Exception:
|
||||
# Return defaults if config loading fails
|
||||
return default_config
|
||||
|
||||
def _discover_plugins(self) -> Dict[str, Type[IssueBackend]]:
|
||||
"""
|
||||
Discover available backend plugins.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping backend names to plugin classes
|
||||
"""
|
||||
plugins = {}
|
||||
|
||||
# Try to import known plugins
|
||||
plugin_modules = [
|
||||
('gitea', 'markitect.issues.plugins.gitea', 'GiteaPlugin'),
|
||||
('local', 'markitect.issues.plugins.local', 'LocalPlugin'),
|
||||
]
|
||||
|
||||
for name, module_path, class_name in plugin_modules:
|
||||
try:
|
||||
module = importlib.import_module(module_path)
|
||||
plugin_class = getattr(module, class_name)
|
||||
if issubclass(plugin_class, IssueBackend):
|
||||
plugins[name] = plugin_class
|
||||
except (ImportError, AttributeError):
|
||||
# Plugin not available, skip
|
||||
continue
|
||||
|
||||
return plugins
|
||||
@@ -1,5 +0,0 @@
|
||||
"""
|
||||
Issue management plugins.
|
||||
|
||||
This package contains backend implementations for different issue management systems.
|
||||
"""
|
||||
@@ -1,102 +0,0 @@
|
||||
"""
|
||||
Gitea backend plugin for issue management.
|
||||
|
||||
This plugin integrates with existing GiteaIssueRepository infrastructure.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from typing import List, Optional, Dict, Any
|
||||
|
||||
from ..base import IssueBackend
|
||||
from domain.issues.models import Issue
|
||||
from infrastructure.repositories.gitea_repository import GiteaIssueRepository
|
||||
from infrastructure.connection_manager import ConnectionManager, DataSourceConfig
|
||||
|
||||
|
||||
class GiteaPlugin(IssueBackend):
|
||||
"""Gitea backend plugin using existing repository infrastructure."""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""Initialize Gitea plugin with configuration."""
|
||||
super().__init__(config)
|
||||
|
||||
# Store repo info for API endpoints
|
||||
self.repo_owner = config.get('repo_owner', 'coulomb')
|
||||
self.repo_name = config.get('repo_name', 'markitect_project')
|
||||
self.repo_full_name = f"{self.repo_owner}/{self.repo_name}"
|
||||
|
||||
# Create connection manager with configuration
|
||||
datasource_config = DataSourceConfig(
|
||||
gitea_base_url=config.get('url', 'http://92.205.130.254:32166'),
|
||||
gitea_token=config.get('token', ''),
|
||||
database_path=config.get('database_path', 'markitect.db')
|
||||
)
|
||||
connection_manager = ConnectionManager(datasource_config)
|
||||
|
||||
# Create repository with repo info
|
||||
self.repository = GiteaIssueRepository(connection_manager)
|
||||
self.repository.set_repo_info(self.repo_owner, self.repo_name)
|
||||
|
||||
def list_issues(self, state: Optional[str] = None) -> List[Issue]:
|
||||
"""List issues from Gitea."""
|
||||
return asyncio.run(self._list_issues_async(state))
|
||||
|
||||
async def _list_issues_async(self, state: Optional[str] = None) -> List[Issue]:
|
||||
"""Async implementation of list_issues."""
|
||||
if state == 'all' or state is None:
|
||||
state = None # Repository expects None for all issues
|
||||
return await self.repository.get_issues(state=state)
|
||||
|
||||
def get_issue(self, issue_id: str) -> Issue:
|
||||
"""Get specific issue from Gitea."""
|
||||
return asyncio.run(self._get_issue_async(issue_id))
|
||||
|
||||
async def _get_issue_async(self, issue_id: str) -> Issue:
|
||||
"""Async implementation of get_issue."""
|
||||
issue_number = int(issue_id)
|
||||
return await self.repository.get_issue(issue_number)
|
||||
|
||||
def create_issue(self, title: str, body: str, **kwargs) -> Issue:
|
||||
"""Create new issue in Gitea."""
|
||||
return asyncio.run(self._create_issue_async(title, body, **kwargs))
|
||||
|
||||
async def _create_issue_async(self, title: str, body: str, **kwargs) -> Issue:
|
||||
"""Async implementation of create_issue."""
|
||||
return await self.repository.create_issue(title=title, body=body, **kwargs)
|
||||
|
||||
def add_comment(self, issue_id: str, comment: str) -> Dict[str, Any]:
|
||||
"""Add comment to Gitea issue."""
|
||||
return asyncio.run(self._add_comment_async(issue_id, comment))
|
||||
|
||||
async def _add_comment_async(self, issue_id: str, comment: str) -> Dict[str, Any]:
|
||||
"""Async implementation of add_comment."""
|
||||
if not comment.strip():
|
||||
raise ValueError("Comment cannot be empty")
|
||||
if not issue_id.strip():
|
||||
raise ValueError("Issue ID cannot be empty")
|
||||
|
||||
# For now, return mock comment data
|
||||
# This will be implemented when comment support is added to repository
|
||||
return {
|
||||
'id': 'comment_123',
|
||||
'body': comment,
|
||||
'issue_id': issue_id
|
||||
}
|
||||
|
||||
def close_issue(self, issue_id: str) -> Issue:
|
||||
"""Close issue in Gitea."""
|
||||
return asyncio.run(self._close_issue_async(issue_id))
|
||||
|
||||
async def _close_issue_async(self, issue_id: str) -> Issue:
|
||||
"""Async implementation of close_issue."""
|
||||
issue_number = int(issue_id)
|
||||
return await self.repository.update_issue(issue_number, state='closed')
|
||||
|
||||
def update_issue(self, issue_id: str, **kwargs) -> Issue:
|
||||
"""Update issue in Gitea."""
|
||||
return asyncio.run(self._update_issue_async(issue_id, **kwargs))
|
||||
|
||||
async def _update_issue_async(self, issue_id: str, **kwargs) -> Issue:
|
||||
"""Async implementation of update_issue."""
|
||||
issue_number = int(issue_id)
|
||||
return await self.repository.update_issue(issue_number, **kwargs)
|
||||
@@ -1,300 +0,0 @@
|
||||
"""
|
||||
Local file backend plugin for issue management.
|
||||
|
||||
This plugin provides offline issue management using markdown files with YAML frontmatter.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import yaml
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
|
||||
from ..base import IssueBackend
|
||||
from domain.issues.models import Issue, IssueState, Label
|
||||
|
||||
|
||||
class LocalPlugin(IssueBackend):
|
||||
"""Local file-based backend plugin."""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
"""Initialize local plugin with configuration."""
|
||||
super().__init__(config)
|
||||
|
||||
self.issues_dir = Path(config.get('directory', '.markitect/issues'))
|
||||
self.auto_git = config.get('auto_git', True)
|
||||
|
||||
self._setup_directory_structure()
|
||||
self._load_local_config()
|
||||
|
||||
def _setup_directory_structure(self):
|
||||
"""Create necessary directory structure."""
|
||||
self.issues_dir.mkdir(parents=True, exist_ok=True)
|
||||
(self.issues_dir / 'open').mkdir(exist_ok=True)
|
||||
(self.issues_dir / 'closed').mkdir(exist_ok=True)
|
||||
|
||||
def _load_local_config(self):
|
||||
"""Load or create local configuration."""
|
||||
config_file = self.issues_dir / 'config.yml'
|
||||
|
||||
if config_file.exists():
|
||||
with open(config_file, 'r') as f:
|
||||
self.local_config = yaml.safe_load(f) or {}
|
||||
else:
|
||||
self.local_config = {'next_issue_number': self.config.get('numbering_start', 1)}
|
||||
self._save_local_config()
|
||||
|
||||
def _save_local_config(self):
|
||||
"""Save local configuration."""
|
||||
config_file = self.issues_dir / 'config.yml'
|
||||
with open(config_file, 'w') as f:
|
||||
yaml.dump(self.local_config, f, default_flow_style=False)
|
||||
|
||||
def list_issues(self, state: Optional[str] = None) -> List[Issue]:
|
||||
"""List issues from local files."""
|
||||
issues = []
|
||||
|
||||
if state == 'open' or state is None or state == 'all':
|
||||
issues.extend(self._read_issues_from_directory(self.issues_dir / 'open'))
|
||||
|
||||
if state == 'closed' or state is None or state == 'all':
|
||||
issues.extend(self._read_issues_from_directory(self.issues_dir / 'closed'))
|
||||
|
||||
# Sort by issue number
|
||||
issues.sort(key=lambda x: x.number)
|
||||
return issues
|
||||
|
||||
def _read_issues_from_directory(self, directory: Path) -> List[Issue]:
|
||||
"""Read all issues from a directory."""
|
||||
issues = []
|
||||
|
||||
if not directory.exists():
|
||||
return issues
|
||||
|
||||
for file_path in directory.glob('*.md'):
|
||||
try:
|
||||
issue = self._read_issue_file(file_path)
|
||||
issues.append(issue)
|
||||
except Exception:
|
||||
# Skip malformed files
|
||||
continue
|
||||
|
||||
return issues
|
||||
|
||||
def _read_issue_file(self, file_path: Path) -> Issue:
|
||||
"""Read issue from markdown file with YAML frontmatter."""
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read()
|
||||
|
||||
# Split frontmatter and body
|
||||
if content.startswith('---\n'):
|
||||
try:
|
||||
parts = content.split('---\n', 2)
|
||||
if len(parts) >= 3:
|
||||
frontmatter = yaml.safe_load(parts[1])
|
||||
body = parts[2].strip()
|
||||
else:
|
||||
frontmatter = {}
|
||||
body = content
|
||||
except yaml.YAMLError:
|
||||
raise yaml.YAMLError(f"Invalid YAML in {file_path}")
|
||||
else:
|
||||
frontmatter = {}
|
||||
body = content
|
||||
|
||||
# Convert string labels to Label objects
|
||||
label_objects = []
|
||||
for label in frontmatter.get('labels', []):
|
||||
if isinstance(label, str):
|
||||
label_objects.append(Label(name=label))
|
||||
else:
|
||||
label_objects.append(label)
|
||||
|
||||
# Map state string to IssueState enum
|
||||
state_str = frontmatter.get('state', 'open')
|
||||
issue_state = IssueState.OPEN if state_str == 'open' else IssueState.CLOSED
|
||||
|
||||
# Create Issue object
|
||||
issue = Issue(
|
||||
number=frontmatter.get('number', 0),
|
||||
title=frontmatter.get('title', ''),
|
||||
state=issue_state,
|
||||
labels=label_objects,
|
||||
created_at=datetime.fromisoformat(frontmatter.get('created_at', datetime.now().isoformat())),
|
||||
updated_at=datetime.fromisoformat(frontmatter.get('updated_at', datetime.now().isoformat())),
|
||||
assignee=frontmatter.get('assignee'),
|
||||
milestone=frontmatter.get('milestone')
|
||||
)
|
||||
|
||||
# Store body separately since domain model doesn't have it
|
||||
issue._body = body
|
||||
|
||||
return issue
|
||||
|
||||
def get_issue(self, issue_id: str) -> Issue:
|
||||
"""Get specific issue by ID."""
|
||||
file_path = self._find_issue_file(issue_id)
|
||||
if not file_path:
|
||||
raise FileNotFoundError(f"Issue {issue_id} not found")
|
||||
|
||||
return self._read_issue_file(file_path)
|
||||
|
||||
def _find_issue_file(self, issue_id: str) -> Optional[Path]:
|
||||
"""Find issue file in open or closed directories."""
|
||||
# Convert issue_id to 3-digit format to match filename pattern
|
||||
issue_num = f"{int(issue_id):03d}"
|
||||
pattern = f"{issue_num}-*.md"
|
||||
|
||||
# Search in open directory
|
||||
for file_path in (self.issues_dir / 'open').glob(pattern):
|
||||
return file_path
|
||||
|
||||
# Search in closed directory
|
||||
for file_path in (self.issues_dir / 'closed').glob(pattern):
|
||||
return file_path
|
||||
|
||||
return None
|
||||
|
||||
def create_issue(self, title: str, body: str, **kwargs) -> Issue:
|
||||
"""Create new issue as local file."""
|
||||
issue_number = self.local_config.get('next_issue_number', 1)
|
||||
|
||||
# Convert string labels to Label objects
|
||||
label_objects = []
|
||||
for label in kwargs.get('labels', []):
|
||||
if isinstance(label, str):
|
||||
label_objects.append(Label(name=label))
|
||||
else:
|
||||
label_objects.append(label)
|
||||
|
||||
# Create Issue object
|
||||
issue = Issue(
|
||||
number=issue_number,
|
||||
title=title,
|
||||
state=IssueState.OPEN,
|
||||
labels=label_objects,
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
assignee=kwargs.get('assignee'),
|
||||
milestone=kwargs.get('milestone')
|
||||
)
|
||||
|
||||
# Store body separately since domain model doesn't have it
|
||||
issue._body = body # Temporary storage for body content
|
||||
|
||||
# Write to file
|
||||
self._write_issue_file(issue, self.issues_dir / 'open')
|
||||
|
||||
# Update counter
|
||||
self.local_config['next_issue_number'] = issue_number + 1
|
||||
self._save_local_config()
|
||||
|
||||
# Git integration
|
||||
if self.auto_git:
|
||||
self._git_add_and_commit(f"Create issue #{issue_number}: {title}")
|
||||
|
||||
return issue
|
||||
|
||||
def _write_issue_file(self, issue: Issue, directory: Path):
|
||||
"""Write issue to markdown file with YAML frontmatter."""
|
||||
filename = self._generate_filename(issue)
|
||||
file_path = directory / filename
|
||||
|
||||
# Convert Label objects to strings for YAML
|
||||
label_names = [label.name for label in issue.labels]
|
||||
|
||||
# Prepare frontmatter
|
||||
frontmatter = {
|
||||
'number': issue.number,
|
||||
'title': issue.title,
|
||||
'state': issue.state.value,
|
||||
'created_at': issue.created_at.isoformat(),
|
||||
'updated_at': issue.updated_at.isoformat(),
|
||||
'labels': label_names,
|
||||
'assignee': issue.assignee,
|
||||
'milestone': issue.milestone
|
||||
}
|
||||
|
||||
# Write file
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
f.write('---\n')
|
||||
yaml.dump(frontmatter, f, default_flow_style=False)
|
||||
f.write('---\n\n')
|
||||
f.write(getattr(issue, '_body', ''))
|
||||
|
||||
def _generate_filename(self, issue: Issue) -> str:
|
||||
"""Generate safe filename from issue."""
|
||||
# Sanitize title for filename
|
||||
safe_title = re.sub(r'[^\w\s-]', '', issue.title.lower())
|
||||
safe_title = re.sub(r'[\s_-]+', '-', safe_title)
|
||||
safe_title = safe_title.strip('-')[:50] # Limit length
|
||||
|
||||
return f"{issue.number:03d}-{safe_title}.md"
|
||||
|
||||
def add_comment(self, issue_id: str, comment: str) -> Dict[str, Any]:
|
||||
"""Add comment to local issue file."""
|
||||
if not comment.strip():
|
||||
raise ValueError("Comment cannot be empty")
|
||||
if not issue_id.strip():
|
||||
raise ValueError("Issue ID cannot be empty")
|
||||
|
||||
# For now, return mock comment data
|
||||
# Full implementation would append to issue file
|
||||
return {
|
||||
'id': f"comment_{datetime.now().timestamp()}",
|
||||
'body': comment,
|
||||
'timestamp': datetime.now().isoformat()
|
||||
}
|
||||
|
||||
def close_issue(self, issue_id: str) -> Issue:
|
||||
"""Close issue by moving to closed directory."""
|
||||
return self.update_issue(issue_id, state=IssueState.CLOSED)
|
||||
|
||||
def update_issue(self, issue_id: str, **kwargs) -> Issue:
|
||||
"""Update issue properties."""
|
||||
file_path = self._find_issue_file(issue_id)
|
||||
if not file_path:
|
||||
raise FileNotFoundError(f"Issue {issue_id} not found")
|
||||
|
||||
# Read current issue
|
||||
issue = self._read_issue_file(file_path)
|
||||
|
||||
# Update properties
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(issue, key):
|
||||
setattr(issue, key, value)
|
||||
|
||||
# Handle state change (move file if needed)
|
||||
old_state = 'open' if 'open' in str(file_path) else 'closed'
|
||||
new_state_obj = kwargs.get('state', issue.state)
|
||||
new_state = new_state_obj.value if hasattr(new_state_obj, 'value') else str(new_state_obj)
|
||||
|
||||
if new_state != old_state:
|
||||
# Remove old file
|
||||
file_path.unlink()
|
||||
|
||||
# Write to new directory
|
||||
new_directory = self.issues_dir / new_state
|
||||
self._write_issue_file(issue, new_directory)
|
||||
else:
|
||||
# Update existing file
|
||||
self._write_issue_file(issue, file_path.parent)
|
||||
|
||||
# Git integration
|
||||
if self.auto_git:
|
||||
self._git_add_and_commit(f"Update issue #{issue.number}")
|
||||
|
||||
return issue
|
||||
|
||||
def _git_add_and_commit(self, message: str):
|
||||
"""Add and commit changes to git."""
|
||||
try:
|
||||
subprocess.run(['git', 'add', str(self.issues_dir)],
|
||||
cwd='.', check=True, capture_output=True)
|
||||
subprocess.run(['git', 'commit', '-m', message],
|
||||
cwd='.', check=True, capture_output=True)
|
||||
except (subprocess.CalledProcessError, FileNotFoundError):
|
||||
# Git not available or not a git repo, ignore
|
||||
pass
|
||||
Reference in New Issue
Block a user