Comprehensive fix for test suite warnings across multiple issue test files: ### SQLite3 Date Adapter Warnings (Python 3.12) - Fixed 101 warnings in Issue 113 (activity_tracker.py) - Fixed 55 warnings in Issue 114 (allocation_engine.py) - Fixed 148 warnings in Issue 122 (worktime_tracker.py + test file) - Fixed 18 warnings in Issue 124 (day_wrapup_commands.py + worktime_tracker.py) ### Pytest-asyncio Configuration - Added asyncio_default_fixture_loop_scope = function to pytest.ini - Eliminates pytest-asyncio deprecation warning ### Runtime Warnings for Unawaited Coroutines - Fixed 2 warnings in Issue 59 (gitea plugin async mocking) - Enhanced AsyncTestCase with better coroutine cleanup - Improved async mock management in test utilities ### Technical Changes - Convert Python date/datetime objects to ISO strings before SQLite queries - Use .isoformat() with defensive hasattr() checks for backward compatibility - Simplified async test mocking to avoid coroutine creation - Enhanced cleanup_async_mocks() function for comprehensive cleanup ### Results - Before: ~324 warnings across test suite - After: 0 warnings - completely clean test suite - All 216+ tests pass with zero warning noise 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
566 lines
21 KiB
Python
566 lines
21 KiB
Python
"""
|
|
Cost Allocation Engine for MarkiTect Issue Cost Distribution.
|
|
|
|
This module implements the core allocation engine that distributes operational
|
|
costs across active issues using the defined algorithm from Issue #88.
|
|
|
|
The engine handles:
|
|
- Equal distribution of costs across active issues in a period
|
|
- Loss carried forward when no active issues exist
|
|
- Transaction audit trail creation
|
|
- Edge case handling and validation
|
|
- Integration with period management and activity tracking
|
|
"""
|
|
|
|
import sqlite3
|
|
from datetime import date, datetime
|
|
from decimal import Decimal
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
from .models import FinanceModels
|
|
from .cost_manager import CostItemManager
|
|
from .period_manager import PeriodManager, Period, PeriodStatus
|
|
from ..issues.activity_tracker import IssueActivityTracker, ActivityType
|
|
|
|
|
|
class AllocationStatus(Enum):
|
|
"""Status enumeration for allocation operations."""
|
|
SUCCESS = "success"
|
|
NO_ACTIVE_ISSUES = "no_active_issues"
|
|
NO_COSTS_TO_ALLOCATE = "no_costs_to_allocate"
|
|
PERIOD_CLOSED = "period_closed"
|
|
ERROR = "error"
|
|
|
|
|
|
@dataclass
|
|
class AllocationResult:
|
|
"""Result of a cost allocation operation."""
|
|
status: AllocationStatus
|
|
period_id: int
|
|
total_costs: Decimal = Decimal('0.00')
|
|
active_issues: List[int] = None
|
|
cost_per_issue: Decimal = Decimal('0.00')
|
|
allocations_created: int = 0
|
|
transactions_created: int = 0
|
|
loss_carried_forward: Decimal = Decimal('0.00')
|
|
message: str = ""
|
|
|
|
def __post_init__(self):
|
|
if self.active_issues is None:
|
|
self.active_issues = []
|
|
|
|
|
|
@dataclass
|
|
class IssueAllocation:
|
|
"""Represents a cost allocation to a specific issue."""
|
|
issue_id: int
|
|
allocated_amount: Decimal
|
|
allocation_date: date
|
|
period_id: int
|
|
transaction_id: Optional[int] = None
|
|
|
|
|
|
class TransactionManager:
|
|
"""Manages cost transaction audit trails for allocations."""
|
|
|
|
def __init__(self, db_path: str):
|
|
"""
|
|
Initialize transaction manager.
|
|
|
|
Args:
|
|
db_path: Path to the SQLite database
|
|
"""
|
|
self.db_path = db_path
|
|
self.finance_models = FinanceModels(db_path)
|
|
|
|
def create_allocation_transaction(
|
|
self,
|
|
period_id: int,
|
|
amount: Decimal,
|
|
issue_id: int,
|
|
transaction_date: date,
|
|
description: str
|
|
) -> int:
|
|
"""
|
|
Create a cost allocation transaction record.
|
|
|
|
Args:
|
|
period_id: ID of the cost period
|
|
amount: Amount allocated to the issue
|
|
issue_id: ID of the issue receiving allocation
|
|
transaction_date: Date of the transaction
|
|
description: Description of the allocation
|
|
|
|
Returns:
|
|
ID of the created transaction
|
|
"""
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
INSERT INTO cost_transactions
|
|
(period_id, transaction_type, amount_eur, issue_id,
|
|
transaction_date, description)
|
|
VALUES (?, 'cost_allocated', ?, ?, ?, ?)
|
|
''', (period_id, float(amount), issue_id, transaction_date.isoformat() if hasattr(transaction_date, 'isoformat') else transaction_date, description))
|
|
|
|
return cursor.lastrowid
|
|
|
|
def create_loss_forward_transaction(
|
|
self,
|
|
from_period_id: int,
|
|
to_period_id: int,
|
|
amount: Decimal,
|
|
transaction_date: date,
|
|
description: str
|
|
) -> int:
|
|
"""
|
|
Create a loss carried forward transaction.
|
|
|
|
Args:
|
|
from_period_id: Source period ID
|
|
to_period_id: Destination period ID
|
|
amount: Amount being carried forward
|
|
transaction_date: Date of the transaction
|
|
description: Description of the carry forward
|
|
|
|
Returns:
|
|
ID of the created transaction
|
|
"""
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
INSERT INTO cost_transactions
|
|
(period_id, transaction_type, amount_eur, transaction_date, description)
|
|
VALUES (?, 'loss_forward', ?, ?, ?)
|
|
''', (to_period_id, float(amount), transaction_date.isoformat() if hasattr(transaction_date, 'isoformat') else transaction_date, description))
|
|
|
|
return cursor.lastrowid
|
|
|
|
|
|
class AllocationEngine:
|
|
"""
|
|
Core cost allocation engine for distributing operational costs to active issues.
|
|
|
|
Implements the algorithm defined in Issue #88:
|
|
1. Calculate total costs for the period (monthly + one-time + carried forward)
|
|
2. Identify active issues (created/modified during period)
|
|
3. Distribute costs equally among active issues
|
|
4. Handle edge cases (no active issues -> carry forward loss)
|
|
5. Create audit trail transactions
|
|
6. Update period statistics
|
|
"""
|
|
|
|
def __init__(self, db_path: str = "markitect.db"):
|
|
"""
|
|
Initialize the allocation engine.
|
|
|
|
Args:
|
|
db_path: Path to the SQLite database
|
|
"""
|
|
self.db_path = db_path
|
|
self.finance_models = FinanceModels(db_path)
|
|
self.cost_manager = CostItemManager(db_path)
|
|
self.period_manager = PeriodManager(db_path)
|
|
self.activity_tracker = IssueActivityTracker(db_path)
|
|
self.transaction_manager = TransactionManager(db_path)
|
|
|
|
# Ensure database schema is initialized
|
|
self.finance_models.initialize_finance_schema()
|
|
|
|
def allocate_period_costs(self, period_id: int) -> AllocationResult:
|
|
"""
|
|
Allocate costs for a specific period to active issues.
|
|
|
|
Args:
|
|
period_id: ID of the period to process
|
|
|
|
Returns:
|
|
AllocationResult with operation details and status
|
|
"""
|
|
try:
|
|
# Get period details
|
|
period = self._get_period(period_id)
|
|
if not period:
|
|
return AllocationResult(
|
|
status=AllocationStatus.ERROR,
|
|
period_id=period_id,
|
|
message=f"Period {period_id} not found"
|
|
)
|
|
|
|
# Check if period is already closed
|
|
if period.status == PeriodStatus.CLOSED.value:
|
|
return AllocationResult(
|
|
status=AllocationStatus.PERIOD_CLOSED,
|
|
period_id=period_id,
|
|
message=f"Period {period_id} is already closed"
|
|
)
|
|
|
|
# Set period status to calculating
|
|
self._update_period_status(period_id, PeriodStatus.CALCULATING)
|
|
|
|
# Step 1: Calculate total costs for period
|
|
total_costs = self._calculate_period_total_costs(period)
|
|
|
|
if total_costs == Decimal('0.00'):
|
|
self._update_period_status(period_id, PeriodStatus.CLOSED)
|
|
return AllocationResult(
|
|
status=AllocationStatus.NO_COSTS_TO_ALLOCATE,
|
|
period_id=period_id,
|
|
total_costs=total_costs,
|
|
message="No costs to allocate for this period"
|
|
)
|
|
|
|
# Step 2: Identify active issues for the period
|
|
active_issues = self._get_active_issues_for_period(period)
|
|
|
|
if not active_issues:
|
|
# No active issues - carry forward loss to next period
|
|
next_period_id = self._get_or_create_next_period(period)
|
|
if next_period_id:
|
|
self._carry_forward_loss(period_id, next_period_id, total_costs)
|
|
|
|
# Update period and close
|
|
self._update_period_totals(period_id, total_costs, 0, Decimal('0.00'), total_costs)
|
|
self._update_period_status(period_id, PeriodStatus.CLOSED)
|
|
|
|
return AllocationResult(
|
|
status=AllocationStatus.NO_ACTIVE_ISSUES,
|
|
period_id=period_id,
|
|
total_costs=total_costs,
|
|
active_issues=[],
|
|
loss_carried_forward=total_costs,
|
|
message=f"No active issues found. Carried forward €{total_costs:.2f} to next period"
|
|
)
|
|
|
|
# Step 3: Calculate cost per issue (equal distribution)
|
|
cost_per_issue = total_costs / len(active_issues)
|
|
|
|
# Step 4: Create allocations and transactions
|
|
allocations_created = 0
|
|
transactions_created = 0
|
|
allocation_date = date.today()
|
|
|
|
for issue_id in active_issues:
|
|
# Create allocation record
|
|
allocation_id = self._create_issue_allocation(
|
|
issue_id, period_id, cost_per_issue, allocation_date
|
|
)
|
|
|
|
if allocation_id:
|
|
allocations_created += 1
|
|
|
|
# Create audit transaction
|
|
transaction_id = self.transaction_manager.create_allocation_transaction(
|
|
period_id=period_id,
|
|
amount=cost_per_issue,
|
|
issue_id=issue_id,
|
|
transaction_date=allocation_date,
|
|
description=f"Cost allocation for period {period.period_start} to {period.period_end}"
|
|
)
|
|
|
|
if transaction_id:
|
|
transactions_created += 1
|
|
# Link transaction to allocation
|
|
self._update_allocation_transaction_id(allocation_id, transaction_id)
|
|
|
|
# Step 5: Update period totals
|
|
self._update_period_totals(
|
|
period_id, total_costs, len(active_issues), cost_per_issue, Decimal('0.00')
|
|
)
|
|
|
|
# Step 6: Close the period
|
|
self._update_period_status(period_id, PeriodStatus.CLOSED)
|
|
|
|
return AllocationResult(
|
|
status=AllocationStatus.SUCCESS,
|
|
period_id=period_id,
|
|
total_costs=total_costs,
|
|
active_issues=active_issues,
|
|
cost_per_issue=cost_per_issue,
|
|
allocations_created=allocations_created,
|
|
transactions_created=transactions_created,
|
|
message=f"Successfully allocated €{total_costs:.2f} across {len(active_issues)} issues"
|
|
)
|
|
|
|
except Exception as e:
|
|
# Reset period status on error
|
|
self._update_period_status(period_id, PeriodStatus.OPEN)
|
|
return AllocationResult(
|
|
status=AllocationStatus.ERROR,
|
|
period_id=period_id,
|
|
message=f"Allocation failed: {str(e)}"
|
|
)
|
|
|
|
def get_issue_allocations(self, issue_id: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all cost allocations for a specific issue.
|
|
|
|
Args:
|
|
issue_id: ID of the issue
|
|
|
|
Returns:
|
|
List of allocation records
|
|
"""
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT
|
|
ica.id,
|
|
ica.issue_id,
|
|
ica.period_id,
|
|
ica.allocated_amount,
|
|
ica.allocation_date,
|
|
ica.transaction_id,
|
|
cp.period_start,
|
|
cp.period_end,
|
|
cp.period_type
|
|
FROM issue_cost_allocations ica
|
|
JOIN cost_periods cp ON ica.period_id = cp.id
|
|
WHERE ica.issue_id = ?
|
|
ORDER BY ica.allocation_date DESC
|
|
''', (issue_id,))
|
|
|
|
rows = cursor.fetchall()
|
|
allocations = []
|
|
|
|
for row in rows:
|
|
allocation = {
|
|
'id': row[0],
|
|
'issue_id': row[1],
|
|
'period_id': row[2],
|
|
'allocated_amount': float(row[3]),
|
|
'allocation_date': row[4],
|
|
'transaction_id': row[5],
|
|
'period_start': row[6],
|
|
'period_end': row[7],
|
|
'period_type': row[8]
|
|
}
|
|
allocations.append(allocation)
|
|
|
|
return allocations
|
|
|
|
def get_period_allocations(self, period_id: int) -> List[Dict[str, Any]]:
|
|
"""
|
|
Get all allocations for a specific period.
|
|
|
|
Args:
|
|
period_id: ID of the period
|
|
|
|
Returns:
|
|
List of allocation records
|
|
"""
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute('''
|
|
SELECT
|
|
ica.id,
|
|
ica.issue_id,
|
|
ica.allocated_amount,
|
|
ica.allocation_date,
|
|
ica.transaction_id
|
|
FROM issue_cost_allocations ica
|
|
WHERE ica.period_id = ?
|
|
ORDER BY ica.issue_id
|
|
''', (period_id,))
|
|
|
|
rows = cursor.fetchall()
|
|
allocations = []
|
|
|
|
for row in rows:
|
|
allocation = {
|
|
'id': row[0],
|
|
'issue_id': row[1],
|
|
'allocated_amount': float(row[2]),
|
|
'allocation_date': row[3],
|
|
'transaction_id': row[4]
|
|
}
|
|
allocations.append(allocation)
|
|
|
|
return allocations
|
|
|
|
def reverse_allocation(self, allocation_id: int) -> bool:
|
|
"""
|
|
Reverse a cost allocation (for corrections).
|
|
|
|
Args:
|
|
allocation_id: ID of the allocation to reverse
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# Get allocation details
|
|
cursor.execute('''
|
|
SELECT issue_id, period_id, allocated_amount, transaction_id
|
|
FROM issue_cost_allocations
|
|
WHERE id = ?
|
|
''', (allocation_id,))
|
|
|
|
result = cursor.fetchone()
|
|
if not result:
|
|
return False
|
|
|
|
issue_id, period_id, amount, transaction_id = result
|
|
|
|
# Create reversal transaction using adjustment type (allows negative amounts)
|
|
with self.finance_models.get_connection() as conn2:
|
|
cursor2 = conn2.cursor()
|
|
cursor2.execute('''
|
|
INSERT INTO cost_transactions
|
|
(period_id, transaction_type, amount_eur, issue_id,
|
|
transaction_date, description)
|
|
VALUES (?, 'adjustment', ?, ?, ?, ?)
|
|
''', (period_id, float(-amount), issue_id, date.today().isoformat(), f"Reversal of allocation #{allocation_id}"))
|
|
|
|
reversal_transaction_id = cursor2.lastrowid
|
|
|
|
# Only delete if reversal transaction was created successfully
|
|
if reversal_transaction_id:
|
|
cursor.execute('DELETE FROM issue_cost_allocations WHERE id = ?', (allocation_id,))
|
|
return cursor.rowcount > 0
|
|
else:
|
|
return False
|
|
|
|
except Exception as e:
|
|
# Log the exception for debugging in tests
|
|
print(f"Reversal failed with exception: {e}")
|
|
return False
|
|
|
|
def _get_period(self, period_id: int) -> Optional[Period]:
|
|
"""Get period details by ID."""
|
|
period_data = self.period_manager.get_period_by_id(period_id)
|
|
if not period_data:
|
|
return None
|
|
|
|
# Convert dict to Period object
|
|
return Period(
|
|
id=period_data['id'],
|
|
period_start=datetime.strptime(period_data['period_start'], '%Y-%m-%d').date() if period_data['period_start'] else None,
|
|
period_end=datetime.strptime(period_data['period_end'], '%Y-%m-%d').date() if period_data['period_end'] else None,
|
|
period_type=period_data['period_type'],
|
|
status=period_data['status'],
|
|
total_costs=Decimal(str(period_data['total_costs'])),
|
|
active_issues_count=period_data['active_issues_count'],
|
|
cost_per_issue=Decimal(str(period_data['cost_per_issue'])),
|
|
loss_carried_forward=Decimal(str(period_data['loss_carried_forward'] or 0))
|
|
)
|
|
|
|
def _update_period_status(self, period_id: int, status: PeriodStatus):
|
|
"""Update period status."""
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(
|
|
'UPDATE cost_periods SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?',
|
|
(status.value, period_id)
|
|
)
|
|
|
|
def _calculate_period_total_costs(self, period: Period) -> Decimal:
|
|
"""Calculate total costs for a period including carried forward amounts."""
|
|
calculations = self.cost_manager.calculate_period_costs(
|
|
period.period_start, period.period_end
|
|
)
|
|
|
|
period_costs = calculations['total_period']
|
|
carried_forward = period.loss_carried_forward or Decimal('0.00')
|
|
|
|
return Decimal(str(period_costs)) + carried_forward
|
|
|
|
def _get_active_issues_for_period(self, period: Period) -> List[int]:
|
|
"""Get list of active issue IDs for a period."""
|
|
activities = self.activity_tracker.get_activities_by_period(
|
|
period.id,
|
|
activity_types=[
|
|
ActivityType.CREATED,
|
|
ActivityType.MODIFIED,
|
|
ActivityType.COMMENTED,
|
|
ActivityType.STATUS_CHANGED
|
|
]
|
|
)
|
|
|
|
# Get unique issue IDs
|
|
active_issues = list(set(activity.issue_id for activity in activities))
|
|
return active_issues
|
|
|
|
def _get_or_create_next_period(self, current_period: Period) -> Optional[int]:
|
|
"""Get or create the next period for loss carry forward."""
|
|
# For now, return None - next period creation will be handled separately
|
|
# This is a placeholder for future automatic period creation
|
|
return None
|
|
|
|
def _carry_forward_loss(self, from_period_id: int, to_period_id: int, amount: Decimal):
|
|
"""Carry forward loss to next period."""
|
|
# Update the destination period's carried forward amount
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE cost_periods
|
|
SET loss_carried_forward = loss_carried_forward + ?
|
|
WHERE id = ?
|
|
''', (float(amount), to_period_id))
|
|
|
|
# Create audit transaction
|
|
self.transaction_manager.create_loss_forward_transaction(
|
|
from_period_id=from_period_id,
|
|
to_period_id=to_period_id,
|
|
amount=amount,
|
|
transaction_date=date.today(),
|
|
description=f"Loss carried forward from period {from_period_id}"
|
|
)
|
|
|
|
def _create_issue_allocation(
|
|
self, issue_id: int, period_id: int, amount: Decimal, allocation_date: date
|
|
) -> Optional[int]:
|
|
"""Create an issue cost allocation record."""
|
|
try:
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO issue_cost_allocations
|
|
(issue_id, period_id, allocated_amount, allocation_date)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (issue_id, period_id, float(amount), allocation_date.isoformat() if hasattr(allocation_date, 'isoformat') else allocation_date))
|
|
|
|
return cursor.lastrowid
|
|
except sqlite3.IntegrityError:
|
|
# Allocation already exists for this issue/period
|
|
return None
|
|
|
|
def _update_allocation_transaction_id(self, allocation_id: int, transaction_id: int):
|
|
"""Link allocation to its audit transaction."""
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE issue_cost_allocations
|
|
SET transaction_id = ?
|
|
WHERE id = ?
|
|
''', (transaction_id, allocation_id))
|
|
|
|
def _update_period_totals(
|
|
self,
|
|
period_id: int,
|
|
total_costs: Decimal,
|
|
active_issues_count: int,
|
|
cost_per_issue: Decimal,
|
|
loss_carried_forward: Decimal
|
|
):
|
|
"""Update period summary statistics."""
|
|
with self.finance_models.get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
UPDATE cost_periods
|
|
SET total_costs = ?,
|
|
active_issues_count = ?,
|
|
cost_per_issue = ?,
|
|
loss_carried_forward = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
''', (float(total_costs), active_issues_count, float(cost_per_issue), float(loss_carried_forward), period_id)) |