diff --git a/markitect/finance/cli.py b/markitect/finance/cli.py index 202f2d3c..e623c0be 100644 --- a/markitect/finance/cli.py +++ b/markitect/finance/cli.py @@ -7,7 +7,7 @@ including report generation, cost item management, and period calculations. import click import sys -from datetime import date, datetime +from datetime import date, datetime, timedelta from decimal import Decimal from pathlib import Path from typing import Optional @@ -15,6 +15,7 @@ from typing import Optional from .cost_manager import CostItemManager, CostItem from .report_generator import CostReportGenerator, ReportConfig from .session_tracker import SessionCostTracker +from .period_manager import PeriodManager, PeriodStatus from ..config_manager import ConfigurationManager @@ -546,4 +547,365 @@ def session_summary(issue_ids: Optional[str], db_path: Optional[str]): except Exception as e: click.echo(f"Error getting session summary: {e}", err=True) + sys.exit(1) + + +@cost_commands.group(name='period') +def cost_period(): + """Manage calculation periods and lifecycle.""" + pass + + +@cost_period.command('create') +@click.option('--start-date', required=True, help='Period start date (YYYY-MM-DD)') +@click.option('--end-date', required=True, help='Period end date (YYYY-MM-DD)') +@click.option('--type', 'period_type', default='monthly', help='Period type (monthly, quarterly, yearly)') +@click.option('--loss-forward', type=float, help='Loss carried forward from previous period') +@click.option('--database', 'db_path', help='Database path (defaults to config)') +def create_period(start_date: str, end_date: str, period_type: str, + loss_forward: Optional[float], db_path: Optional[str]): + """Create a new calculation period.""" + try: + # Get database path + if not db_path: + config_manager = ConfigurationManager() + config = config_manager.get_current_config() + db_path = config.get('database_path') + + if not db_path: + click.echo("Error: No database path specified.", err=True) + sys.exit(1) + + # Parse dates + try: + start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').date() + end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').date() + except ValueError: + click.echo("Error: Dates must be in YYYY-MM-DD format", err=True) + sys.exit(1) + + # Initialize period manager + period_manager = PeriodManager(db_path) + + # Convert loss forward to Decimal + loss_forward_decimal = None + if loss_forward is not None: + loss_forward_decimal = Decimal(str(loss_forward)) + + # Create period + period_id = period_manager.create_period( + period_start=start_date_obj, + period_end=end_date_obj, + period_type=period_type, + loss_carried_forward=loss_forward_decimal + ) + + if period_id: + click.echo(f"✅ Created period #{period_id}") + click.echo(f"📅 Period: {start_date} to {end_date}") + click.echo(f"📊 Type: {period_type}") + + if loss_forward: + click.echo(f"💸 Loss carried forward: €{loss_forward:.4f}") + else: + click.echo("❌ Failed to create period", err=True) + sys.exit(1) + + except ValueError as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error creating period: {e}", err=True) + sys.exit(1) + + +@cost_period.command('list') +@click.option('--status', type=click.Choice(['open', 'calculating', 'closed']), + help='Filter by status') +@click.option('--start-from', help='Show periods starting from date (YYYY-MM-DD)') +@click.option('--end-before', help='Show periods ending before date (YYYY-MM-DD)') +@click.option('--database', 'db_path', help='Database path (defaults to config)') +def list_periods(status: Optional[str], start_from: Optional[str], + end_before: Optional[str], db_path: Optional[str]): + """List calculation periods with optional filtering.""" + try: + # Get database path + if not db_path: + config_manager = ConfigurationManager() + config = config_manager.get_current_config() + db_path = config.get('database_path') + + if not db_path: + click.echo("Error: No database path specified.", err=True) + sys.exit(1) + + # Parse date filters + start_date_obj = None + end_date_obj = None + + if start_from: + try: + start_date_obj = datetime.strptime(start_from, '%Y-%m-%d').date() + except ValueError: + click.echo("Error: Start date must be in YYYY-MM-DD format", err=True) + sys.exit(1) + + if end_before: + try: + end_date_obj = datetime.strptime(end_before, '%Y-%m-%d').date() + except ValueError: + click.echo("Error: End date must be in YYYY-MM-DD format", err=True) + sys.exit(1) + + # Get periods + period_manager = PeriodManager(db_path) + periods = period_manager.list_periods( + status_filter=status, + start_date=start_date_obj, + end_date=end_date_obj + ) + + if not periods: + click.echo("No periods found matching criteria.") + return + + # Display periods + click.echo(f"📅 Calculation Periods") + click.echo("=" * 80) + click.echo(f"{'ID':<4} {'Start Date':<12} {'End Date':<12} {'Type':<10} {'Status':<12} {'Total Cost':<12}") + click.echo("-" * 80) + + for period in periods: + click.echo( + f"{period['id']:<4} {period['period_start']:<12} {period['period_end']:<12} " + f"{period['period_type']:<10} {period['status']:<12} €{float(period['total_costs']):<11.2f}" + ) + + click.echo(f"\nTotal: {len(periods)} periods") + + except Exception as e: + click.echo(f"Error listing periods: {e}", err=True) + sys.exit(1) + + +@cost_period.command('show') +@click.argument('period_id', type=int) +@click.option('--database', 'db_path', help='Database path (defaults to config)') +def show_period(period_id: int, db_path: Optional[str]): + """Show detailed information for a specific period.""" + try: + # Get database path + if not db_path: + config_manager = ConfigurationManager() + config = config_manager.get_current_config() + db_path = config.get('database_path') + + if not db_path: + click.echo("Error: No database path specified.", err=True) + sys.exit(1) + + # Get period details + period_manager = PeriodManager(db_path) + period = period_manager.get_period_by_id(period_id) + + if not period: + click.echo(f"Period #{period_id} not found.", err=True) + sys.exit(1) + + # Display period details + click.echo(f"📅 Period #{period['id']} Details") + click.echo("=" * 40) + click.echo(f"Start Date: {period['period_start']}") + click.echo(f"End Date: {period['period_end']}") + click.echo(f"Type: {period['period_type']}") + click.echo(f"Status: {period['status']}") + click.echo(f"Total Costs: €{float(period['total_costs']):.4f}") + click.echo(f"Active Issues: {period['active_issues_count']}") + + if period['cost_per_issue'] > 0: + click.echo(f"Cost per Issue: €{float(period['cost_per_issue']):.4f}") + + if period['loss_carried_forward'] > 0: + click.echo(f"Loss Carried Forward: €{float(period['loss_carried_forward']):.4f}") + + click.echo(f"Created: {period['created_at']}") + + if period['updated_at']: + click.echo(f"Updated: {period['updated_at']}") + + except Exception as e: + click.echo(f"Error showing period: {e}", err=True) + sys.exit(1) + + +@cost_period.command('calculate') +@click.argument('period_id', type=int) +@click.option('--database', 'db_path', help='Database path (defaults to config)') +def calculate_period(period_id: int, db_path: Optional[str]): + """Calculate costs for a specific period.""" + try: + # Get database path + if not db_path: + config_manager = ConfigurationManager() + config = config_manager.get_current_config() + db_path = config.get('database_path') + + if not db_path: + click.echo("Error: No database path specified.", err=True) + sys.exit(1) + + # Calculate period costs + period_manager = PeriodManager(db_path) + calculation = period_manager.calculate_period_costs(period_id) + + click.echo(f"📊 Period #{calculation['period_id']} Cost Calculation") + click.echo("=" * 50) + click.echo(f"Period: {calculation['period_start']} to {calculation['period_end']}") + click.echo(f"Monthly Recurring: €{calculation['monthly_costs']:.2f}") + click.echo(f"One-time Expenses: €{calculation['one_time_costs']:.2f}") + + if calculation['loss_carried_forward'] > 0: + click.echo(f"Loss Carried Forward: €{calculation['loss_carried_forward']:.2f}") + + click.echo(f"Total Period Cost: €{calculation['total_costs']:.2f}") + click.echo(f"Calculated: {calculation['calculation_date'][:19]}") + + except Exception as e: + click.echo(f"Error calculating period: {e}", err=True) + sys.exit(1) + + +@cost_period.command('status') +@click.argument('period_id', type=int) +@click.argument('new_status', type=click.Choice(['open', 'calculating', 'closed'])) +@click.option('--database', 'db_path', help='Database path (defaults to config)') +def update_status(period_id: int, new_status: str, db_path: Optional[str]): + """Update period status.""" + try: + # Get database path + if not db_path: + config_manager = ConfigurationManager() + config = config_manager.get_current_config() + db_path = config.get('database_path') + + if not db_path: + click.echo("Error: No database path specified.", err=True) + sys.exit(1) + + # Update status + period_manager = PeriodManager(db_path) + success = period_manager.update_period_status(period_id, new_status) + + if success: + click.echo(f"✅ Period #{period_id} status updated to '{new_status}'") + else: + click.echo(f"❌ Failed to update period #{period_id} status", err=True) + sys.exit(1) + + except ValueError as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error updating status: {e}", err=True) + sys.exit(1) + + +@cost_period.command('close') +@click.argument('period_id', type=int) +@click.option('--skip-validation', is_flag=True, help='Skip transaction validation') +@click.option('--database', 'db_path', help='Database path (defaults to config)') +def close_period(period_id: int, skip_validation: bool, db_path: Optional[str]): + """Close a period after final calculations.""" + try: + # Get database path + if not db_path: + config_manager = ConfigurationManager() + config = config_manager.get_current_config() + db_path = config.get('database_path') + + if not db_path: + click.echo("Error: No database path specified.", err=True) + sys.exit(1) + + # Close period + period_manager = PeriodManager(db_path) + success = period_manager.close_period( + period_id, + validate_transactions=not skip_validation + ) + + if success: + click.echo(f"✅ Period #{period_id} has been closed") + + # Show final calculation + period = period_manager.get_period_by_id(period_id) + if period: + click.echo(f"💰 Final total cost: €{float(period['total_costs']):.4f}") + else: + click.echo(f"❌ Failed to close period #{period_id}", err=True) + sys.exit(1) + + except ValueError as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Error closing period: {e}", err=True) + sys.exit(1) + + +@cost_period.command('current') +@click.option('--date', 'date_str', help='Date to find period for (YYYY-MM-DD, defaults to today)') +@click.option('--database', 'db_path', help='Database path (defaults to config)') +def current_period(date_str: Optional[str], db_path: Optional[str]): + """Show current active period for a given date.""" + try: + # Get database path + if not db_path: + config_manager = ConfigurationManager() + config = config_manager.get_current_config() + db_path = config.get('database_path') + + if not db_path: + click.echo("Error: No database path specified.", err=True) + sys.exit(1) + + # Parse date if provided + target_date = None + if date_str: + try: + target_date = datetime.strptime(date_str, '%Y-%m-%d').date() + except ValueError: + click.echo("Error: Date must be in YYYY-MM-DD format", err=True) + sys.exit(1) + + # Get current period + period_manager = PeriodManager(db_path) + current = period_manager.get_current_period(target_date) + + if not current: + date_display = date_str if date_str else "today" + click.echo(f"No active period found for {date_display}") + + # Suggest creating a period + if not date_str: + today = date.today() + # Calculate last day of current month + if today.month == 12: + next_month = today.replace(year=today.year + 1, month=1, day=1) + else: + next_month = today.replace(month=today.month + 1, day=1) + last_day = (next_month - timedelta(days=1)).day + click.echo(f"💡 Create one with: markitect cost period create --start-date {today.year}-{today.month:02d}-01 --end-date {today.year}-{today.month:02d}-{last_day:02d}") + return + + # Display current period + click.echo(f"📅 Current Active Period") + click.echo("=" * 30) + click.echo(f"Period #{current['id']}") + click.echo(f"Dates: {current['period_start']} to {current['period_end']}") + click.echo(f"Status: {current['status']}") + click.echo(f"Total Costs: €{float(current['total_costs']):.4f}") + + except Exception as e: + click.echo(f"Error getting current period: {e}", err=True) sys.exit(1) \ No newline at end of file diff --git a/markitect/finance/period_manager.py b/markitect/finance/period_manager.py new file mode 100644 index 00000000..6e083d67 --- /dev/null +++ b/markitect/finance/period_manager.py @@ -0,0 +1,568 @@ +""" +Period Management Framework for MarkiTect Cost Tracking System. + +This module provides comprehensive period lifecycle management including: +- Period creation, status management, and transitions +- Period overlap validation and conflict resolution +- Automatic period calculations and data aggregation +- Loss carried forward calculations +- Period closure validation and audit trails + +The framework supports the complete period lifecycle: +1. OPEN: Period is created and active for cost tracking +2. CALCULATING: Period is being processed for cost allocation +3. CLOSED: Period is finalized with all calculations complete +""" + +import sqlite3 +import os +from datetime import datetime, date, timedelta +from decimal import Decimal +from typing import Optional, Dict, Any, List, Tuple +from dataclasses import dataclass +from enum import Enum + +from .models import FinanceModels + + +class PeriodStatus(Enum): + """Period status enumeration.""" + OPEN = 'open' + CALCULATING = 'calculating' + CLOSED = 'closed' + + +@dataclass +class Period: + """Period data model.""" + id: Optional[int] = None + period_start: Optional[date] = None + period_end: Optional[date] = None + period_type: str = 'monthly' + status: str = 'open' + total_costs: Decimal = Decimal('0.00') + active_issues_count: int = 0 + cost_per_issue: Decimal = Decimal('0.00') + loss_carried_forward: Decimal = Decimal('0.00') + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + + +class PeriodManager: + """ + Comprehensive period management system for cost tracking. + + Handles period lifecycle management, status transitions, calculations, + and validation for the cost tracking system. + """ + + def __init__(self, db_path: str): + """ + Initialize period manager. + + Args: + db_path: Path to SQLite database file + """ + self.db_path = db_path + self.finance_models = FinanceModels(db_path) + + def create_period(self, period_start: date, period_end: date, + period_type: str = 'monthly', + loss_carried_forward: Optional[Decimal] = None) -> Optional[int]: + """ + Create a new calculation period with validation. + + Args: + period_start: Start date of the period + period_end: End date of the period + period_type: Type of period (monthly, quarterly, yearly) + loss_carried_forward: Amount carried forward from previous period + + Returns: + ID of created period, or None if creation failed + + Raises: + ValueError: If period dates are invalid or overlapping periods exist + """ + # Validate period dates + if period_end <= period_start: + raise ValueError("Period end date must be after start date") + + # Check for overlapping periods + overlapping = self.find_overlapping_periods(period_start, period_end) + if overlapping: + overlapping_periods = [f"#{p['id']} ({p['period_start']} to {p['period_end']})" + for p in overlapping] + raise ValueError(f"Period overlaps with existing periods: {', '.join(overlapping_periods)}") + + # Set loss carried forward to 0 if not provided + if loss_carried_forward is None: + loss_carried_forward = Decimal('0.00') + + conn = self.finance_models.get_connection() + cursor = conn.cursor() + + try: + cursor.execute(''' + INSERT INTO cost_periods ( + period_start, period_end, period_type, status, + total_costs, active_issues_count, cost_per_issue, + loss_carried_forward + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + period_start.isoformat(), + period_end.isoformat(), + period_type, + PeriodStatus.OPEN.value, + 0, + 0, + 0, + float(loss_carried_forward) + )) + + conn.commit() + period_id = cursor.lastrowid + return period_id + + except sqlite3.Error as e: + conn.rollback() + raise RuntimeError(f"Failed to create period: {e}") + + finally: + conn.close() + + def get_period_by_id(self, period_id: int) -> Optional[Dict[str, Any]]: + """ + Get period by ID. + + Args: + period_id: Period ID + + Returns: + Period dictionary or None if not found + """ + conn = self.finance_models.get_connection() + cursor = conn.cursor() + + try: + cursor.execute(''' + SELECT id, period_start, period_end, period_type, status, + total_costs, active_issues_count, cost_per_issue, + loss_carried_forward, created_at, updated_at + FROM cost_periods + WHERE id = ? + ''', (period_id,)) + + row = cursor.fetchone() + if not row: + return None + + return { + 'id': row[0], + 'period_start': row[1], + 'period_end': row[2], + 'period_type': row[3], + 'status': row[4], + 'total_costs': Decimal(str(row[5])), + 'active_issues_count': row[6], + 'cost_per_issue': Decimal(str(row[7])), + 'loss_carried_forward': Decimal(str(row[8])), + 'created_at': row[9], + 'updated_at': row[10] + } + + except sqlite3.Error as e: + raise RuntimeError(f"Failed to get period: {e}") + + finally: + conn.close() + + def find_overlapping_periods(self, period_start: date, period_end: date, + exclude_period_id: Optional[int] = None) -> List[Dict[str, Any]]: + """ + Find periods that overlap with the given date range. + + Args: + period_start: Start date to check + period_end: End date to check + exclude_period_id: Period ID to exclude from check (for updates) + + Returns: + List of overlapping periods + """ + conn = self.finance_models.get_connection() + cursor = conn.cursor() + + try: + sql = ''' + SELECT id, period_start, period_end, status + FROM cost_periods + WHERE NOT (period_end < ? OR period_start > ?) + ''' + params = [period_start.isoformat(), period_end.isoformat()] + + if exclude_period_id: + sql += ' AND id != ?' + params.append(exclude_period_id) + + cursor.execute(sql, params) + rows = cursor.fetchall() + + return [ + { + 'id': row[0], + 'period_start': row[1], + 'period_end': row[2], + 'status': row[3] + } + for row in rows + ] + + except sqlite3.Error as e: + raise RuntimeError(f"Failed to check overlapping periods: {e}") + + finally: + conn.close() + + def update_period_status(self, period_id: int, new_status: str) -> bool: + """ + Update period status with validation. + + Args: + period_id: Period ID to update + new_status: New status (open, calculating, closed) + + Returns: + True if update successful, False otherwise + + Raises: + ValueError: If status transition is invalid + """ + valid_statuses = [status.value for status in PeriodStatus] + if new_status not in valid_statuses: + raise ValueError(f"Invalid status '{new_status}'. Valid statuses: {valid_statuses}") + + # Get current period to validate status transition + current_period = self.get_period_by_id(period_id) + if not current_period: + raise ValueError(f"Period #{period_id} not found") + + current_status = current_period['status'] + + # Validate status transitions + valid_transitions = { + PeriodStatus.OPEN.value: [PeriodStatus.CALCULATING.value, PeriodStatus.CLOSED.value], + PeriodStatus.CALCULATING.value: [PeriodStatus.OPEN.value, PeriodStatus.CLOSED.value], + PeriodStatus.CLOSED.value: [PeriodStatus.OPEN.value] # Allow reopening if needed + } + + if new_status not in valid_transitions.get(current_status, []): + raise ValueError(f"Invalid status transition from '{current_status}' to '{new_status}'") + + conn = self.finance_models.get_connection() + cursor = conn.cursor() + + try: + cursor.execute(''' + UPDATE cost_periods + SET status = ?, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + ''', (new_status, period_id)) + + conn.commit() + return cursor.rowcount > 0 + + except sqlite3.Error as e: + conn.rollback() + raise RuntimeError(f"Failed to update period status: {e}") + + finally: + conn.close() + + def calculate_period_costs(self, period_id: int) -> Dict[str, Any]: + """ + Calculate total costs for a period based on active cost items. + + Args: + period_id: Period ID to calculate + + Returns: + Dictionary with calculation results + """ + period = self.get_period_by_id(period_id) + if not period: + raise ValueError(f"Period #{period_id} not found") + + period_start = datetime.strptime(period['period_start'], '%Y-%m-%d').date() + period_end = datetime.strptime(period['period_end'], '%Y-%m-%d').date() + + # Import CostItemManager to get cost data + from .cost_manager import CostItemManager + cost_manager = CostItemManager(self.db_path) + + # Get all active cost items + cost_items = cost_manager.list_cost_items(active_only=True) + + total_monthly = Decimal('0') + total_one_time = Decimal('0') + + for item in cost_items: + item_start = datetime.strptime(item['starting_from_date'], '%Y-%m-%d').date() + + # Skip items that start after our period + if item_start > period_end: + continue + + # Handle ending date + item_end = None + if item['ending_date']: + item_end = datetime.strptime(item['ending_date'], '%Y-%m-%d').date() + # Skip items that ended before our period + if item_end < period_start: + continue + + # Calculate cost for this period + if item['cost_type'] == 'monthly': + # For monthly items, include full amount if active during period + total_monthly += Decimal(str(item['amount_eur'])) + elif item['cost_type'] == 'one_time': + # For one-time items, include only if starting date is within period + if period_start <= item_start <= period_end: + total_one_time += Decimal(str(item['amount_eur'])) + + total_costs = total_monthly + total_one_time + period['loss_carried_forward'] + + # Update period with calculated values + conn = self.finance_models.get_connection() + cursor = conn.cursor() + + try: + cursor.execute(''' + UPDATE cost_periods + SET total_costs = ?, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + ''', (float(total_costs), period_id)) + + conn.commit() + + except sqlite3.Error as e: + conn.rollback() + raise RuntimeError(f"Failed to update period calculations: {e}") + + finally: + conn.close() + + return { + 'period_id': period_id, + 'period_start': period['period_start'], + 'period_end': period['period_end'], + 'monthly_costs': float(total_monthly), + 'one_time_costs': float(total_one_time), + 'loss_carried_forward': float(period['loss_carried_forward']), + 'total_costs': float(total_costs), + 'calculation_date': datetime.now().isoformat() + } + + def close_period(self, period_id: int, validate_transactions: bool = True) -> bool: + """ + Close a period after validation. + + Args: + period_id: Period ID to close + validate_transactions: Whether to validate all transactions are recorded + + Returns: + True if period closed successfully, False otherwise + + Raises: + ValueError: If period cannot be closed due to validation errors + """ + period = self.get_period_by_id(period_id) + if not period: + raise ValueError(f"Period #{period_id} not found") + + if period['status'] == PeriodStatus.CLOSED.value: + return True # Already closed + + # Optional transaction validation + if validate_transactions: + # Check if there are any unrecorded transactions + # This is a placeholder for future transaction validation logic + pass + + # Calculate final costs before closing + calculation_result = self.calculate_period_costs(period_id) + + # Update status to closed + return self.update_period_status(period_id, PeriodStatus.CLOSED.value) + + def list_periods(self, status_filter: Optional[str] = None, + start_date: Optional[date] = None, + end_date: Optional[date] = None) -> List[Dict[str, Any]]: + """ + List periods with optional filtering. + + Args: + status_filter: Filter by status (open, calculating, closed) + start_date: Filter periods starting from this date + end_date: Filter periods ending before this date + + Returns: + List of periods matching criteria + """ + conn = self.finance_models.get_connection() + cursor = conn.cursor() + + try: + sql = ''' + SELECT id, period_start, period_end, period_type, status, + total_costs, active_issues_count, cost_per_issue, + loss_carried_forward, created_at, updated_at + FROM cost_periods + WHERE 1=1 + ''' + params = [] + + if status_filter: + sql += ' AND status = ?' + params.append(status_filter) + + if start_date: + sql += ' AND period_start >= ?' + params.append(start_date.isoformat()) + + if end_date: + sql += ' AND period_end <= ?' + params.append(end_date.isoformat()) + + sql += ' ORDER BY period_start DESC' + + cursor.execute(sql, params) + rows = cursor.fetchall() + + return [ + { + 'id': row[0], + 'period_start': row[1], + 'period_end': row[2], + 'period_type': row[3], + 'status': row[4], + 'total_costs': Decimal(str(row[5])), + 'active_issues_count': row[6], + 'cost_per_issue': Decimal(str(row[7])), + 'loss_carried_forward': Decimal(str(row[8])), + 'created_at': row[9], + 'updated_at': row[10] + } + for row in rows + ] + + except sqlite3.Error as e: + raise RuntimeError(f"Failed to list periods: {e}") + + finally: + conn.close() + + def get_current_period(self, target_date: Optional[date] = None) -> Optional[Dict[str, Any]]: + """ + Get the current active period for a given date. + + Args: + target_date: Date to find period for (defaults to today) + + Returns: + Current period dictionary or None if no period found + """ + if target_date is None: + target_date = date.today() + + conn = self.finance_models.get_connection() + cursor = conn.cursor() + + try: + cursor.execute(''' + SELECT id, period_start, period_end, period_type, status, + total_costs, active_issues_count, cost_per_issue, + loss_carried_forward, created_at, updated_at + FROM cost_periods + WHERE period_start <= ? AND period_end >= ? + AND status IN ('open', 'calculating') + ORDER BY period_start DESC + LIMIT 1 + ''', (target_date.isoformat(), target_date.isoformat())) + + row = cursor.fetchone() + if not row: + return None + + return { + 'id': row[0], + 'period_start': row[1], + 'period_end': row[2], + 'period_type': row[3], + 'status': row[4], + 'total_costs': Decimal(str(row[5])), + 'active_issues_count': row[6], + 'cost_per_issue': Decimal(str(row[7])), + 'loss_carried_forward': Decimal(str(row[8])), + 'created_at': row[9], + 'updated_at': row[10] + } + + except sqlite3.Error as e: + raise RuntimeError(f"Failed to get current period: {e}") + + finally: + conn.close() + + def create_monthly_period(self, year: int, month: int) -> Optional[int]: + """ + Convenience method to create a monthly period. + + Args: + year: Year of the period + month: Month of the period (1-12) + + Returns: + ID of created period, or None if creation failed + """ + # Calculate period dates + period_start = date(year, month, 1) + + # Calculate last day of month + if month == 12: + next_month_start = date(year + 1, 1, 1) + else: + next_month_start = date(year, month + 1, 1) + + period_end = next_month_start - timedelta(days=1) + + return self.create_period(period_start, period_end, 'monthly') + + def auto_create_period_for_date(self, target_date: date) -> Optional[int]: + """ + Automatically create a monthly period for a given date if it doesn't exist. + + Args: + target_date: Date that should be covered by a period + + Returns: + ID of existing or newly created period, or None if creation failed + """ + # Check if period already exists + current_period = self.get_current_period(target_date) + if current_period: + return current_period['id'] + + # Create new monthly period for the date + year = target_date.year + month = target_date.month + + try: + return self.create_monthly_period(year, month) + except ValueError as e: + # Period might already exist due to race condition, try to get it again + current_period = self.get_current_period(target_date) + if current_period: + return current_period['id'] + raise e \ No newline at end of file diff --git a/tests/test_period_cli_commands.py b/tests/test_period_cli_commands.py new file mode 100644 index 00000000..4761577c --- /dev/null +++ b/tests/test_period_cli_commands.py @@ -0,0 +1,454 @@ +""" +Tests for MarkiTect period management CLI commands. + +This module tests the command-line interface for period management including: +- Period creation, listing, and status management +- Period calculation and lifecycle operations +- CLI error handling and validation +""" + +import pytest +import tempfile +import os +from datetime import date +from decimal import Decimal +from click.testing import CliRunner + +from markitect.finance.cli import cost_commands +from markitect.finance.models import FinanceModels +from markitect.finance.period_manager import PeriodManager +from markitect.finance.cost_manager import CostItemManager, CostItem + + +class TestPeriodCLICommands: + """Test suite for period management CLI commands.""" + + @pytest.fixture + def temp_db(self): + """Create temporary database for testing.""" + fd, path = tempfile.mkstemp(suffix='.db') + os.close(fd) + yield path + os.unlink(path) + + @pytest.fixture + def setup_test_data(self, temp_db): + """Setup test database with sample period data.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + period_manager = PeriodManager(temp_db) + + # Create sample period + period_id = period_manager.create_period( + period_start=date(2025, 1, 1), + period_end=date(2025, 1, 31), + period_type='monthly' + ) + + return temp_db, period_id + + @pytest.fixture + def runner(self): + """Create Click test runner.""" + return CliRunner() + + def test_period_create_success(self, runner, temp_db): + """Test period creation via CLI.""" + # Initialize database first + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'create', + '--start-date', '2025-02-01', + '--end-date', '2025-02-28', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "✅ Created period #" in result.output + assert "📅 Period: 2025-02-01 to 2025-02-28" in result.output + assert "📊 Type: monthly" in result.output + + def test_period_create_with_loss_forward(self, runner, temp_db): + """Test period creation with loss carried forward.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'create', + '--start-date', '2025-03-01', + '--end-date', '2025-03-31', + '--loss-forward', '15.75', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "✅ Created period #" in result.output + assert "💸 Loss carried forward: €15.7500" in result.output + + def test_period_create_invalid_dates(self, runner, temp_db): + """Test period creation with invalid date format.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'create', + '--start-date', 'invalid-date', + '--end-date', '2025-02-28', + '--database', temp_db + ]) + + assert result.exit_code == 1 + assert "Error: Dates must be in YYYY-MM-DD format" in result.output + + def test_period_create_overlapping_fails(self, runner, setup_test_data): + """Test that creating overlapping periods fails.""" + temp_db, existing_period_id = setup_test_data + + result = runner.invoke(cost_commands, [ + 'period', 'create', + '--start-date', '2025-01-15', # Overlaps with existing period + '--end-date', '2025-02-15', + '--database', temp_db + ]) + + assert result.exit_code == 1 + assert "Error:" in result.output + assert "overlaps" in result.output.lower() + + def test_period_list_all(self, runner, setup_test_data): + """Test listing all periods.""" + temp_db, period_id = setup_test_data + + result = runner.invoke(cost_commands, [ + 'period', 'list', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "📅 Calculation Periods" in result.output + assert "2025-01-01" in result.output + assert "2025-01-31" in result.output + assert "Total: 1 periods" in result.output + + def test_period_list_with_status_filter(self, runner, setup_test_data): + """Test listing periods with status filter.""" + temp_db, period_id = setup_test_data + + # Create second period and close it + period_manager = PeriodManager(temp_db) + period_id2 = period_manager.create_period( + period_start=date(2025, 2, 1), + period_end=date(2025, 2, 28) + ) + period_manager.close_period(period_id2) + + # Filter by open status + result = runner.invoke(cost_commands, [ + 'period', 'list', + '--status', 'open', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "2025-01-01" in result.output # First period should be shown + assert "2025-02-01" not in result.output # Second period should be filtered out + + # Filter by closed status + result = runner.invoke(cost_commands, [ + 'period', 'list', + '--status', 'closed', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "2025-02-01" in result.output # Second period should be shown + assert "2025-01-01" not in result.output # First period should be filtered out + + def test_period_list_with_date_filters(self, runner, temp_db): + """Test listing periods with date range filters.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + period_manager = PeriodManager(temp_db) + + # Create periods in different months + jan_period = period_manager.create_period(date(2025, 1, 1), date(2025, 1, 31)) + feb_period = period_manager.create_period(date(2025, 2, 1), date(2025, 2, 28)) + + # Filter by start date + result = runner.invoke(cost_commands, [ + 'period', 'list', + '--start-from', '2025-02-01', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "2025-02-01" in result.output + assert "2025-01-01" not in result.output + + def test_period_list_empty(self, runner, temp_db): + """Test listing periods when none exist.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'list', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "No periods found matching criteria" in result.output + + def test_period_show_details(self, runner, setup_test_data): + """Test showing period details.""" + temp_db, period_id = setup_test_data + + result = runner.invoke(cost_commands, [ + 'period', 'show', str(period_id), + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert f"📅 Period #{period_id} Details" in result.output + assert "Start Date: 2025-01-01" in result.output + assert "End Date: 2025-01-31" in result.output + assert "Type: monthly" in result.output + assert "Status: open" in result.output + + def test_period_show_nonexistent(self, runner, temp_db): + """Test showing non-existent period.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'show', '999', + '--database', temp_db + ]) + + assert result.exit_code == 1 + assert "Period #999 not found" in result.output + + def test_period_calculate(self, runner, setup_test_data): + """Test period cost calculation.""" + temp_db, period_id = setup_test_data + + # Add some cost items for calculation + cost_manager = CostItemManager(temp_db) + infra_cat = cost_manager.get_category_by_name('Infrastructure') + + cost_item = CostItem( + category_id=infra_cat['id'], + name='Test Server', + cost_type='monthly', + amount_eur=Decimal('25.00'), + starting_from_date=date(2025, 1, 1) + ) + cost_manager.create_cost_item(cost_item) + + result = runner.invoke(cost_commands, [ + 'period', 'calculate', str(period_id), + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert f"📊 Period #{period_id} Cost Calculation" in result.output + assert "Period: 2025-01-01 to 2025-01-31" in result.output + assert "Monthly Recurring: €25.00" in result.output + assert "Total Period Cost: €25.00" in result.output + + def test_period_calculate_nonexistent(self, runner, temp_db): + """Test calculating costs for non-existent period.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'calculate', '999', + '--database', temp_db + ]) + + assert result.exit_code == 1 + assert "Error calculating period:" in result.output + + def test_period_status_update(self, runner, setup_test_data): + """Test period status update.""" + temp_db, period_id = setup_test_data + + result = runner.invoke(cost_commands, [ + 'period', 'status', str(period_id), 'calculating', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert f"✅ Period #{period_id} status updated to 'calculating'" in result.output + + # Verify the status was actually updated + result = runner.invoke(cost_commands, [ + 'period', 'show', str(period_id), + '--database', temp_db + ]) + + assert "Status: calculating" in result.output + + def test_period_status_update_invalid_status(self, runner, setup_test_data): + """Test period status update with invalid status.""" + temp_db, period_id = setup_test_data + + result = runner.invoke(cost_commands, [ + 'period', 'status', str(period_id), 'invalid', + '--database', temp_db + ]) + + assert result.exit_code == 2 # Click validation error + assert "Invalid value" in result.output + + def test_period_status_update_nonexistent(self, runner, temp_db): + """Test status update for non-existent period.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'status', '999', 'calculating', + '--database', temp_db + ]) + + assert result.exit_code == 1 + assert "Error:" in result.output + + def test_period_close(self, runner, setup_test_data): + """Test period closure.""" + temp_db, period_id = setup_test_data + + result = runner.invoke(cost_commands, [ + 'period', 'close', str(period_id), + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert f"✅ Period #{period_id} has been closed" in result.output + assert "💰 Final total cost:" in result.output + + # Verify the period is actually closed + result = runner.invoke(cost_commands, [ + 'period', 'show', str(period_id), + '--database', temp_db + ]) + + assert "Status: closed" in result.output + + def test_period_close_nonexistent(self, runner, temp_db): + """Test closing non-existent period.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'close', '999', + '--database', temp_db + ]) + + assert result.exit_code == 1 + assert "Error:" in result.output + + def test_period_current_exists(self, runner, setup_test_data): + """Test finding current period when it exists.""" + temp_db, period_id = setup_test_data + + result = runner.invoke(cost_commands, [ + 'period', 'current', + '--date', '2025-01-15', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "📅 Current Active Period" in result.output + assert f"Period #{period_id}" in result.output + assert "Dates: 2025-01-01 to 2025-01-31" in result.output + + def test_period_current_not_found(self, runner, temp_db): + """Test finding current period when none exists.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'current', + '--date', '2025-03-15', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "No active period found for 2025-03-15" in result.output + + def test_period_current_default_to_today(self, runner, temp_db): + """Test current period defaults to today.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'current', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "No active period found for today" in result.output + assert "💡 Create one with:" in result.output + assert "markitect cost period create" in result.output + + def test_period_current_invalid_date(self, runner, temp_db): + """Test current period with invalid date format.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'current', + '--date', 'invalid-date', + '--database', temp_db + ]) + + assert result.exit_code == 1 + assert "Error: Date must be in YYYY-MM-DD format" in result.output + + def test_period_help_commands(self, runner): + """Test help output for period commands.""" + # Test main period help + result = runner.invoke(cost_commands, ['period', '--help']) + assert result.exit_code == 0 + assert "Manage calculation periods and lifecycle" in result.output + + # Test create help + result = runner.invoke(cost_commands, ['period', 'create', '--help']) + assert result.exit_code == 0 + assert "Create a new calculation period" in result.output + + # Test list help + result = runner.invoke(cost_commands, ['period', 'list', '--help']) + assert result.exit_code == 0 + assert "List calculation periods with optional filtering" in result.output + + def test_period_commands_missing_database(self, runner): + """Test period commands without database specification.""" + # These should use default config path and still work or show appropriate error + result = runner.invoke(cost_commands, [ + 'period', 'list' + ]) + + # Should succeed with default database configuration + assert result.exit_code == 0 + + def test_period_create_quarterly_type(self, runner, temp_db): + """Test creating quarterly period type.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + + result = runner.invoke(cost_commands, [ + 'period', 'create', + '--start-date', '2025-04-01', + '--end-date', '2025-06-30', + '--type', 'quarterly', + '--database', temp_db + ]) + + assert result.exit_code == 0 + assert "✅ Created period #" in result.output + assert "📊 Type: quarterly" in result.output \ No newline at end of file diff --git a/tests/test_period_manager.py b/tests/test_period_manager.py new file mode 100644 index 00000000..d7c8ebec --- /dev/null +++ b/tests/test_period_manager.py @@ -0,0 +1,489 @@ +""" +Tests for MarkiTect Period Management Framework. + +This module tests the complete period lifecycle management system including: +- Period creation, status management, and lifecycle transitions +- Period overlap validation and conflict resolution +- Period calculations and cost aggregation +- Period closure validation and audit trails +- Current period detection and auto-creation +""" + +import pytest +import tempfile +import os +from datetime import date, datetime, timedelta +from decimal import Decimal + +from markitect.finance.period_manager import PeriodManager, PeriodStatus, Period +from markitect.finance.models import FinanceModels +from markitect.finance.cost_manager import CostItemManager, CostItem + + +class TestPeriodManager: + """Test suite for period management system.""" + + @pytest.fixture + def temp_db(self): + """Create temporary database for testing.""" + fd, path = tempfile.mkstemp(suffix='.db') + os.close(fd) + yield path + os.unlink(path) + + @pytest.fixture + def period_manager(self, temp_db): + """Create period manager with initialized database.""" + finance_models = FinanceModels(temp_db) + finance_models.initialize_finance_schema() + return PeriodManager(temp_db) + + @pytest.fixture + def sample_period_data(self): + """Sample period data for testing.""" + return { + 'period_start': date(2025, 1, 1), + 'period_end': date(2025, 1, 31), + 'period_type': 'monthly' + } + + def test_period_status_enum(self): + """Test period status enumeration.""" + assert PeriodStatus.OPEN.value == 'open' + assert PeriodStatus.CALCULATING.value == 'calculating' + assert PeriodStatus.CLOSED.value == 'closed' + + def test_period_dataclass(self): + """Test Period dataclass creation.""" + period = Period( + id=1, + period_start=date(2025, 1, 1), + period_end=date(2025, 1, 31), + period_type='monthly', + status='open', + total_costs=Decimal('100.50') + ) + + assert period.id == 1 + assert period.period_start == date(2025, 1, 1) + assert period.period_end == date(2025, 1, 31) + assert period.total_costs == Decimal('100.50') + + def test_create_period_success(self, period_manager, sample_period_data): + """Test successful period creation.""" + period_id = period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'], + period_type=sample_period_data['period_type'] + ) + + assert period_id is not None + assert isinstance(period_id, int) + + # Verify period was created + created_period = period_manager.get_period_by_id(period_id) + assert created_period is not None + assert created_period['period_start'] == sample_period_data['period_start'].isoformat() + assert created_period['period_end'] == sample_period_data['period_end'].isoformat() + assert created_period['status'] == PeriodStatus.OPEN.value + + def test_create_period_invalid_dates(self, period_manager): + """Test period creation with invalid date range.""" + with pytest.raises(ValueError, match="Period end date must be after start date"): + period_manager.create_period( + period_start=date(2025, 1, 31), + period_end=date(2025, 1, 1) # End before start + ) + + def test_create_period_with_loss_carried_forward(self, period_manager, sample_period_data): + """Test period creation with loss carried forward.""" + loss_amount = Decimal('25.50') + period_id = period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'], + loss_carried_forward=loss_amount + ) + + created_period = period_manager.get_period_by_id(period_id) + assert created_period['loss_carried_forward'] == loss_amount + + def test_find_overlapping_periods(self, period_manager, sample_period_data): + """Test overlap detection functionality.""" + # Create first period + period_id1 = period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'] + ) + + # Test overlapping period detection + overlapping = period_manager.find_overlapping_periods( + period_start=date(2025, 1, 15), # Overlaps with existing + period_end=date(2025, 2, 15) + ) + + assert len(overlapping) == 1 + assert overlapping[0]['id'] == period_id1 + + def test_create_overlapping_period_fails(self, period_manager, sample_period_data): + """Test that creating overlapping periods fails.""" + # Create first period + period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'] + ) + + # Try to create overlapping period + with pytest.raises(ValueError, match="Period overlaps with existing periods"): + period_manager.create_period( + period_start=date(2025, 1, 15), # Overlaps + period_end=date(2025, 2, 15) + ) + + def test_update_period_status_valid_transition(self, period_manager, sample_period_data): + """Test valid period status transitions.""" + period_id = period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'] + ) + + # Transition from OPEN to CALCULATING + success = period_manager.update_period_status(period_id, PeriodStatus.CALCULATING.value) + assert success is True + + updated_period = period_manager.get_period_by_id(period_id) + assert updated_period['status'] == PeriodStatus.CALCULATING.value + + # Transition from CALCULATING to CLOSED + success = period_manager.update_period_status(period_id, PeriodStatus.CLOSED.value) + assert success is True + + updated_period = period_manager.get_period_by_id(period_id) + assert updated_period['status'] == PeriodStatus.CLOSED.value + + def test_update_period_status_invalid_status(self, period_manager, sample_period_data): + """Test update with invalid status.""" + period_id = period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'] + ) + + with pytest.raises(ValueError, match="Invalid status 'invalid'"): + period_manager.update_period_status(period_id, 'invalid') + + def test_update_period_status_nonexistent_period(self, period_manager): + """Test update status for non-existent period.""" + with pytest.raises(ValueError, match="Period #999 not found"): + period_manager.update_period_status(999, PeriodStatus.CALCULATING.value) + + def test_calculate_period_costs(self, period_manager, sample_period_data, temp_db): + """Test period cost calculation functionality.""" + # Create period + period_id = period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'] + ) + + # Set up cost manager and add test data + finance_models = FinanceModels(temp_db) + cost_manager = CostItemManager(temp_db) + + # Get categories + infra_cat = cost_manager.get_category_by_name('Infrastructure') + software_cat = cost_manager.get_category_by_name('Software') + + # Create test cost items + monthly_item = CostItem( + category_id=infra_cat['id'], + name='Monthly Server', + cost_type='monthly', + amount_eur=Decimal('25.00'), + starting_from_date=date(2024, 12, 1) # Started before period + ) + + one_time_item = CostItem( + category_id=software_cat['id'], + name='One-time License', + cost_type='one_time', + amount_eur=Decimal('50.00'), + starting_from_date=date(2025, 1, 15) # Within period + ) + + cost_manager.create_cost_item(monthly_item) + cost_manager.create_cost_item(one_time_item) + + # Calculate period costs + calculation_result = period_manager.calculate_period_costs(period_id) + + # Verify calculation results + assert calculation_result['period_id'] == period_id + assert calculation_result['monthly_costs'] == 25.0 + assert calculation_result['one_time_costs'] == 50.0 + assert calculation_result['total_costs'] == 75.0 + + # Verify period was updated + updated_period = period_manager.get_period_by_id(period_id) + assert updated_period['total_costs'] == Decimal('75.00') + + def test_close_period(self, period_manager, sample_period_data): + """Test period closure functionality.""" + period_id = period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'] + ) + + # Close the period + success = period_manager.close_period(period_id) + assert success is True + + # Verify period is closed + closed_period = period_manager.get_period_by_id(period_id) + assert closed_period['status'] == PeriodStatus.CLOSED.value + + def test_close_period_already_closed(self, period_manager, sample_period_data): + """Test closing an already closed period.""" + period_id = period_manager.create_period( + period_start=sample_period_data['period_start'], + period_end=sample_period_data['period_end'] + ) + + # Close period first time + period_manager.close_period(period_id) + + # Close again (should succeed without error) + success = period_manager.close_period(period_id) + assert success is True + + def test_close_nonexistent_period(self, period_manager): + """Test closing non-existent period.""" + with pytest.raises(ValueError, match="Period #999 not found"): + period_manager.close_period(999) + + def test_list_periods_no_filter(self, period_manager, sample_period_data): + """Test listing all periods without filters.""" + # Create multiple periods + period_id1 = period_manager.create_period( + period_start=date(2025, 1, 1), + period_end=date(2025, 1, 31) + ) + + period_id2 = period_manager.create_period( + period_start=date(2025, 2, 1), + period_end=date(2025, 2, 28) + ) + + # List all periods + periods = period_manager.list_periods() + + assert len(periods) == 2 + period_ids = [p['id'] for p in periods] + assert period_id1 in period_ids + assert period_id2 in period_ids + + def test_list_periods_with_status_filter(self, period_manager): + """Test listing periods with status filter.""" + # Create periods with different statuses + period_id1 = period_manager.create_period( + period_start=date(2025, 1, 1), + period_end=date(2025, 1, 31) + ) + + period_id2 = period_manager.create_period( + period_start=date(2025, 2, 1), + period_end=date(2025, 2, 28) + ) + + # Close one period + period_manager.close_period(period_id2) + + # Filter by open status + open_periods = period_manager.list_periods(status_filter=PeriodStatus.OPEN.value) + assert len(open_periods) == 1 + assert open_periods[0]['id'] == period_id1 + + # Filter by closed status + closed_periods = period_manager.list_periods(status_filter=PeriodStatus.CLOSED.value) + assert len(closed_periods) == 1 + assert closed_periods[0]['id'] == period_id2 + + def test_list_periods_with_date_filters(self, period_manager): + """Test listing periods with date range filters.""" + # Create periods in different months + jan_period = period_manager.create_period( + period_start=date(2025, 1, 1), + period_end=date(2025, 1, 31) + ) + + feb_period = period_manager.create_period( + period_start=date(2025, 2, 1), + period_end=date(2025, 2, 28) + ) + + # Filter by start date + periods_from_feb = period_manager.list_periods(start_date=date(2025, 2, 1)) + assert len(periods_from_feb) == 1 + assert periods_from_feb[0]['id'] == feb_period + + # Filter by end date + periods_until_jan = period_manager.list_periods(end_date=date(2025, 1, 31)) + assert len(periods_until_jan) == 1 + assert periods_until_jan[0]['id'] == jan_period + + def test_get_current_period(self, period_manager): + """Test getting current period for a specific date.""" + # Create period covering January 2025 + period_id = period_manager.create_period( + period_start=date(2025, 1, 1), + period_end=date(2025, 1, 31) + ) + + # Test date within period + current = period_manager.get_current_period(date(2025, 1, 15)) + assert current is not None + assert current['id'] == period_id + + # Test date outside period + current = period_manager.get_current_period(date(2025, 2, 15)) + assert current is None + + def test_get_current_period_defaults_to_today(self, period_manager): + """Test that get_current_period defaults to today's date.""" + today = date.today() + + # Create period covering today + period_id = period_manager.create_period( + period_start=date(today.year, today.month, 1), + period_end=date(today.year, today.month, 31) if today.month != 12 + else date(today.year, 12, 31) + ) + + # Get current period without specifying date + current = period_manager.get_current_period() + assert current is not None + assert current['id'] == period_id + + def test_create_monthly_period(self, period_manager): + """Test convenience method for creating monthly periods.""" + period_id = period_manager.create_monthly_period(2025, 3) + assert period_id is not None + + # Verify correct dates were set + period = period_manager.get_period_by_id(period_id) + assert period['period_start'] == '2025-03-01' + assert period['period_end'] == '2025-03-31' + assert period['period_type'] == 'monthly' + + def test_create_monthly_period_december(self, period_manager): + """Test creating monthly period for December (year boundary).""" + period_id = period_manager.create_monthly_period(2025, 12) + + period = period_manager.get_period_by_id(period_id) + assert period['period_start'] == '2025-12-01' + assert period['period_end'] == '2025-12-31' + + def test_auto_create_period_for_date(self, period_manager): + """Test automatic period creation for a given date.""" + test_date = date(2025, 5, 15) + + # First call should create new period + period_id = period_manager.auto_create_period_for_date(test_date) + assert period_id is not None + + # Second call should return existing period + period_id2 = period_manager.auto_create_period_for_date(test_date) + assert period_id2 == period_id + + # Verify period covers the test date + period = period_manager.get_period_by_id(period_id) + assert period['period_start'] == '2025-05-01' + assert period['period_end'] == '2025-05-31' + + def test_period_calculation_with_loss_carried_forward(self, period_manager, temp_db): + """Test period calculation including loss carried forward.""" + # Create period with loss carried forward + period_id = period_manager.create_period( + period_start=date(2025, 1, 1), + period_end=date(2025, 1, 31), + loss_carried_forward=Decimal('15.75') + ) + + # Add a cost item + cost_manager = CostItemManager(temp_db) + infra_cat = cost_manager.get_category_by_name('Infrastructure') + + cost_item = CostItem( + category_id=infra_cat['id'], + name='Test Server', + cost_type='monthly', + amount_eur=Decimal('10.00'), + starting_from_date=date(2025, 1, 1) + ) + cost_manager.create_cost_item(cost_item) + + # Calculate costs + calculation = period_manager.calculate_period_costs(period_id) + + # Should include loss carried forward + assert calculation['loss_carried_forward'] == 15.75 + assert calculation['monthly_costs'] == 10.0 + assert calculation['total_costs'] == 25.75 # 10.0 + 15.75 + + def test_period_cost_calculation_edge_cases(self, period_manager, temp_db): + """Test period cost calculation with various edge cases.""" + # Create period + period_id = period_manager.create_period( + period_start=date(2025, 3, 1), + period_end=date(2025, 3, 31) + ) + + cost_manager = CostItemManager(temp_db) + infra_cat = cost_manager.get_category_by_name('Infrastructure') + + # Item that starts before period and ends during period + item1 = CostItem( + category_id=infra_cat['id'], + name='Ending Item', + cost_type='monthly', + amount_eur=Decimal('20.00'), + starting_from_date=date(2025, 1, 1), + ending_date=date(2025, 3, 15) + ) + + # Item that starts after period + item2 = CostItem( + category_id=infra_cat['id'], + name='Future Item', + cost_type='monthly', + amount_eur=Decimal('30.00'), + starting_from_date=date(2025, 4, 1) + ) + + # One-time item outside period + item3 = CostItem( + category_id=infra_cat['id'], + name='Past One-time', + cost_type='one_time', + amount_eur=Decimal('100.00'), + starting_from_date=date(2025, 2, 15) + ) + + cost_manager.create_cost_item(item1) + cost_manager.create_cost_item(item2) + cost_manager.create_cost_item(item3) + + # Calculate costs + calculation = period_manager.calculate_period_costs(period_id) + + # Only item1 should be included (ends during period) + assert calculation['monthly_costs'] == 20.0 + assert calculation['one_time_costs'] == 0.0 + assert calculation['total_costs'] == 20.0 + + def test_error_handling_database_errors(self, period_manager): + """Test error handling for database-related issues.""" + # Test with invalid period ID + with pytest.raises(ValueError, match="Period #-1 not found"): + period_manager.calculate_period_costs(-1) + + # Test getting non-existent period + result = period_manager.get_period_by_id(99999) + assert result is None \ No newline at end of file