feat: reorganize tests by capability with separate test targets
Some checks failed
Test Suite / unit-tests (3.11) (push) Has been cancelled
Test Suite / unit-tests (3.12) (push) Has been cancelled
Test Suite / integration-tests (push) Has been cancelled
Test Suite / e2e-tests (push) Has been cancelled
Test Suite / performance-tests (push) Has been cancelled
Test Suite / code-quality (push) Has been cancelled
Test Suite / security-scan (push) Has been cancelled
Test Suite / test-summary (push) Has been cancelled

Separate capability-specific tests from core system tests to establish clear
test organization and separation of concerns.

## Test Reorganization:
- **markitect-content tests**: Moved 6 tests to capabilities/markitect-content/tests/
- **markitect-finance tests**: Moved 7 tests to markitect/finance/tests/
- **markitect-query tests**: Moved 1 test to markitect/query_paradigms/tests/
- **markitect-graphql tests**: Moved 2 tests to markitect/graphql/tests/
- **markitect-plugins tests**: Moved 2 tests to markitect/plugins/tests/

## Makefile Updates:
- **make test**: Excludes capability tests, runs only core system tests
- **make test-capabilities**: Runs all capability tests
- **make test-capability-***: Individual capability test targets
- Updated all test targets (test-red, test-green, test-ultra-fast, test-perf)
- Added capability test targets to help documentation

## Benefits:
- Clear separation between core system tests and capability-specific tests
- Faster core test execution (capability tests not run by default)
- Individual capability testing for focused development
- Supports future capability extraction workflow
- Maintains capability test independence

Test verification:
- Core tests: 1291 tests (capability tests excluded)
- Finance capability: 143 tests working independently
- Content capability: 79 tests working independently

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-25 02:37:45 +02:00
parent f0dfd04d45
commit 096017b93f
23 changed files with 74 additions and 8 deletions

View File

View File

@@ -0,0 +1,393 @@
"""
Tests for MarkiTect cost tracking CLI commands.
This module tests the command-line interface for cost management including:
- Cost report generation commands
- Cost item management commands
- Category management commands
- Period cost calculations
"""
import pytest
import tempfile
import os
import json
from datetime import date
from decimal import Decimal
from click.testing import CliRunner
from markitect.finance.cli import cost_commands
from markitect.finance.cost_manager import CostItemManager, CostItem
from markitect.finance.models import FinanceModels
class TestCostCLICommands:
"""Test suite for cost tracking CLI commands."""
@pytest.fixture
def temp_db(self):
"""Create temporary database for testing."""
fd, path = tempfile.mkstemp(suffix='.db')
os.close(fd)
yield path
os.unlink(path)
@pytest.fixture
def setup_test_data(self, temp_db):
"""Setup test database with sample cost data."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
cost_manager = CostItemManager(temp_db)
# Get categories
infra_cat = cost_manager.get_category_by_name('Infrastructure')
software_cat = cost_manager.get_category_by_name('Software')
# Create sample cost items
cost_items = [
CostItem(
category_id=infra_cat['id'],
name='Test Server',
cost_type='monthly',
amount_eur=Decimal('25.00'),
starting_from_date=date(2025, 1, 1)
),
CostItem(
category_id=software_cat['id'],
name='Test Software',
cost_type='one_time',
amount_eur=Decimal('50.00'),
starting_from_date=date(2025, 1, 15)
)
]
for item in cost_items:
cost_manager.create_cost_item(item)
return temp_db
@pytest.fixture
def runner(self):
"""Create Click test runner."""
return CliRunner()
def test_cost_report_generate_summary(self, runner, setup_test_data):
"""Test cost report generate command with summary format."""
result = runner.invoke(cost_commands, [
'report', 'generate',
'--period', '2025-01',
'--format', 'summary',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Cost Summary Report - January 2025" in result.output
assert "€75.00" in result.output # 25 + 50
assert "frontmatter" not in result.output.lower() # Should be properly formatted
def test_cost_report_generate_detailed(self, runner, setup_test_data):
"""Test cost report generate command with detailed format."""
result = runner.invoke(cost_commands, [
'report', 'generate',
'--period', '2025-01',
'--format', 'detailed',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Detailed Cost Report - January 2025" in result.output
assert "Infrastructure" in result.output
assert "Software" in result.output
assert "Test Server" in result.output
assert "Test Software" in result.output
def test_cost_report_generate_audit(self, runner, setup_test_data):
"""Test cost report generate command with audit format."""
result = runner.invoke(cost_commands, [
'report', 'generate',
'--period', '2025-01',
'--format', 'audit',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Cost Audit Report - January 2025" in result.output
assert "Audit Summary" in result.output
assert "Transaction History" in result.output
def test_cost_report_generate_with_output_file(self, runner, setup_test_data):
"""Test saving report to output file."""
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.md') as f:
output_path = f.name
try:
result = runner.invoke(cost_commands, [
'report', 'generate',
'--period', '2025-01',
'--output', output_path,
'--database', setup_test_data
])
assert result.exit_code == 0
assert f"Report saved to: {output_path}" in result.output
# Verify file was created
assert os.path.exists(output_path)
with open(output_path, 'r') as f:
content = f.read()
assert "Cost Summary Report" in content
finally:
if os.path.exists(output_path):
os.unlink(output_path)
def test_cost_report_generate_invalid_period(self, runner, setup_test_data):
"""Test report generation with invalid period format."""
result = runner.invoke(cost_commands, [
'report', 'generate',
'--period', 'invalid-period',
'--database', setup_test_data
])
assert result.exit_code == 1
assert "Period must be in YYYY-MM format" in result.output
def test_cost_report_generate_default_database(self, runner):
"""Test report generation with default database path from config."""
result = runner.invoke(cost_commands, [
'report', 'generate',
'--period', '2025-01'
])
# Should succeed with default config and empty database
assert result.exit_code == 0
assert "Cost Summary Report - January 2025" in result.output
assert "€0.00" in result.output # Empty database shows zero costs
def test_cost_report_template_show(self, runner):
"""Test cost report template show command."""
result = runner.invoke(cost_commands, [
'report', 'template', '--show'
])
assert result.exit_code == 0
assert "Summary Report Template" in result.output
assert "Description" in result.output
assert "Frontmatter Fields" in result.output
def test_cost_report_template_different_formats(self, runner):
"""Test template show for different formats."""
formats = ['summary', 'detailed', 'audit']
for format_type in formats:
result = runner.invoke(cost_commands, [
'report', 'template', '--show', '--format', format_type
])
assert result.exit_code == 0
assert f"{format_type.title()} Report Template" in result.output
def test_cost_item_add(self, runner, temp_db):
"""Test adding new cost item via CLI."""
# Initialize database
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'item', 'add', 'Test Item',
'--category', 'Infrastructure',
'--amount', '15.50',
'--type', 'monthly',
'--database', temp_db
])
assert result.exit_code == 0
assert "✅ Created cost item 'Test Item'" in result.output
# Verify item was created
cost_manager = CostItemManager(temp_db)
items = cost_manager.list_cost_items()
assert len(items) == 1
assert items[0]['name'] == 'Test Item'
assert float(items[0]['amount_eur']) == 15.50
def test_cost_item_add_with_description_and_date(self, runner, temp_db):
"""Test adding cost item with description and start date."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'item', 'add', 'Test Item',
'--category', 'Software',
'--amount', '99.99',
'--type', 'one_time',
'--description', 'Test description',
'--start-date', '2025-01-15',
'--database', temp_db
])
assert result.exit_code == 0
assert "✅ Created cost item 'Test Item'" in result.output
def test_cost_item_add_invalid_category(self, runner, temp_db):
"""Test adding item with non-existent category."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'item', 'add', 'Test Item',
'--category', 'NonExistent',
'--amount', '10.00',
'--type', 'monthly',
'--database', temp_db
])
assert result.exit_code == 1
assert "Category 'NonExistent' not found" in result.output
assert "Available categories:" in result.output
def test_cost_item_list(self, runner, setup_test_data):
"""Test listing cost items."""
result = runner.invoke(cost_commands, [
'item', 'list',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Test Server" in result.output
assert "Test Software" in result.output
assert "€25.00" in result.output
assert "€50.00" in result.output
def test_cost_item_list_with_filters(self, runner, setup_test_data):
"""Test listing cost items with filters."""
# Filter by category
result = runner.invoke(cost_commands, [
'item', 'list',
'--category', 'Infrastructure',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Test Server" in result.output
assert "Test Software" not in result.output
# Filter by type
result = runner.invoke(cost_commands, [
'item', 'list',
'--type', 'monthly',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Test Server" in result.output
assert "Test Software" not in result.output
def test_cost_category_list(self, runner, setup_test_data):
"""Test listing cost categories."""
result = runner.invoke(cost_commands, [
'category', 'list',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Infrastructure" in result.output
assert "Software" in result.output
assert "Total: 8 categories" in result.output # Default categories
def test_cost_category_add(self, runner, temp_db):
"""Test adding new cost category."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'category', 'add', 'Custom Category',
'--description', 'Custom test category',
'--database', temp_db
])
assert result.exit_code == 0
assert "✅ Created category 'Custom Category'" in result.output
# Verify category was created
cost_manager = CostItemManager(temp_db)
categories = cost_manager.list_categories()
category_names = [cat['name'] for cat in categories]
assert 'Custom Category' in category_names
def test_cost_calculate(self, runner, setup_test_data):
"""Test cost calculation command."""
result = runner.invoke(cost_commands, [
'calculate',
'--period', '2025-01',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Cost Calculation - January 2025" in result.output
assert "Monthly Recurring: €25.00" in result.output
assert "One-time Expenses: €50.00" in result.output
assert "Total Period Cost: €75.00" in result.output
assert "Active Cost Items: 2" in result.output
def test_cost_calculate_current_month(self, runner, setup_test_data):
"""Test cost calculation for current month (default)."""
result = runner.invoke(cost_commands, [
'calculate',
'--database', setup_test_data
])
assert result.exit_code == 0
assert "Cost Calculation" in result.output
# Should default to current month
def test_cost_calculate_invalid_period(self, runner, setup_test_data):
"""Test cost calculation with invalid period."""
result = runner.invoke(cost_commands, [
'calculate',
'--period', 'invalid',
'--database', setup_test_data
])
assert result.exit_code == 1
assert "Period must be in YYYY-MM format" in result.output
def test_cost_item_add_invalid_date_format(self, runner, temp_db):
"""Test adding item with invalid date format."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'item', 'add', 'Test Item',
'--category', 'Infrastructure',
'--amount', '10.00',
'--type', 'monthly',
'--start-date', 'invalid-date',
'--database', temp_db
])
assert result.exit_code == 1
assert "Start date must be in YYYY-MM-DD format" in result.output
def test_help_commands(self, runner):
"""Test help output for cost commands."""
# Test main cost help
result = runner.invoke(cost_commands, ['--help'])
assert result.exit_code == 0
assert "Cost tracking and financial reporting commands" in result.output
# Test report help
result = runner.invoke(cost_commands, ['report', '--help'])
assert result.exit_code == 0
assert "Generate cost reports" in result.output
# Test item help
result = runner.invoke(cost_commands, ['item', '--help'])
assert result.exit_code == 0
assert "Manage cost items" in result.output
# Test category help
result = runner.invoke(cost_commands, ['category', '--help'])
assert result.exit_code == 0
assert "Manage cost categories" in result.output

View File

@@ -0,0 +1,398 @@
"""
Tests for MarkiTect cost item management system.
This module tests the complete cost item management functionality including:
- Cost item lifecycle (create, update, deactivate)
- Category management
- Business rule validation
- Period-based cost calculations
- Integration with database models
"""
import pytest
import tempfile
import os
from datetime import date, datetime
from decimal import Decimal
from markitect.finance.cost_manager import CostItemManager, CostItem, CostCategory
from markitect.finance.models import FinanceModels
class TestCostItemManager:
"""Test suite for cost item management system."""
@pytest.fixture
def temp_db(self):
"""Create temporary database for testing."""
fd, path = tempfile.mkstemp(suffix='.db')
os.close(fd)
yield path
os.unlink(path)
@pytest.fixture
def cost_manager(self, temp_db):
"""Create CostItemManager instance with initialized database."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
return CostItemManager(temp_db)
@pytest.fixture
def sample_category_id(self, cost_manager):
"""Create a sample category for testing."""
return cost_manager.create_category("Test Category", "For testing purposes")
def test_create_cost_item_valid(self, cost_manager, sample_category_id):
"""Test creating a valid cost item."""
cost_item = CostItem(
category_id=sample_category_id,
name="Test Server",
description="Monthly hosting",
cost_type="monthly",
amount_eur=Decimal('25.50'),
starting_from_date=date(2025, 1, 1)
)
cost_item_id = cost_manager.create_cost_item(cost_item)
assert cost_item_id is not None
# Verify item was created
retrieved = cost_manager.get_cost_item(cost_item_id)
assert retrieved['name'] == "Test Server"
assert float(retrieved['amount_eur']) == 25.50
assert retrieved['cost_type'] == "monthly"
assert retrieved['is_active'] == 1 # SQLite stores booleans as integers
def test_create_cost_item_validation_errors(self, cost_manager, sample_category_id):
"""Test cost item validation errors."""
# Missing name
with pytest.raises(ValueError, match="name is required"):
cost_item = CostItem(
category_id=sample_category_id,
name="",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
)
cost_manager.create_cost_item(cost_item)
# Invalid cost type
with pytest.raises(ValueError, match="must be 'monthly' or 'one_time'"):
cost_item = CostItem(
category_id=sample_category_id,
name="Test Item",
cost_type="invalid",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
)
cost_manager.create_cost_item(cost_item)
# Negative amount
with pytest.raises(ValueError, match="must be non-negative"):
cost_item = CostItem(
category_id=sample_category_id,
name="Test Item",
cost_type="monthly",
amount_eur=Decimal('-10.00'),
starting_from_date=date(2025, 1, 1)
)
cost_manager.create_cost_item(cost_item)
# Invalid date range
with pytest.raises(ValueError, match="must be after starting date"):
cost_item = CostItem(
category_id=sample_category_id,
name="Test Item",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 15),
ending_date=date(2025, 1, 10)
)
cost_manager.create_cost_item(cost_item)
# Inactive without ending date
with pytest.raises(ValueError, match="must have an ending date"):
cost_item = CostItem(
category_id=sample_category_id,
name="Test Item",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1),
is_active=False
)
cost_manager.create_cost_item(cost_item)
def test_update_cost_item(self, cost_manager, sample_category_id):
"""Test updating cost item."""
# Create initial cost item
cost_item = CostItem(
category_id=sample_category_id,
name="Original Name",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
)
cost_item_id = cost_manager.create_cost_item(cost_item)
# Update the cost item
updates = {
'name': 'Updated Name',
'amount_eur': Decimal('15.50'),
'description': 'Updated description'
}
success = cost_manager.update_cost_item(cost_item_id, updates)
assert success is True
# Verify updates
updated = cost_manager.get_cost_item(cost_item_id)
assert updated['name'] == 'Updated Name'
assert float(updated['amount_eur']) == 15.50
assert updated['description'] == 'Updated description'
def test_update_nonexistent_cost_item(self, cost_manager):
"""Test updating non-existent cost item."""
with pytest.raises(ValueError, match="not found"):
cost_manager.update_cost_item(99999, {'name': 'New Name'})
def test_deactivate_cost_item(self, cost_manager, sample_category_id):
"""Test deactivating cost item."""
# Create cost item
cost_item = CostItem(
category_id=sample_category_id,
name="Test Item",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
)
cost_item_id = cost_manager.create_cost_item(cost_item)
# Deactivate with specific ending date
ending_date = date(2025, 6, 30)
success = cost_manager.deactivate_cost_item(cost_item_id, ending_date)
assert success is True
# Verify deactivation
updated = cost_manager.get_cost_item(cost_item_id)
assert updated['is_active'] == 0 # SQLite stores False as 0
assert updated['ending_date'] == ending_date.isoformat()
def test_list_cost_items_filtering(self, cost_manager, sample_category_id):
"""Test listing cost items with filtering."""
# Create multiple cost items
items = [
CostItem(
category_id=sample_category_id,
name="Monthly Item 1",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
),
CostItem(
category_id=sample_category_id,
name="One-time Item",
cost_type="one_time",
amount_eur=Decimal('50.00'),
starting_from_date=date(2025, 1, 1)
),
CostItem(
category_id=sample_category_id,
name="Inactive Item",
cost_type="monthly",
amount_eur=Decimal('5.00'),
starting_from_date=date(2025, 1, 1),
ending_date=date(2025, 1, 31),
is_active=False
)
]
for item in items:
cost_manager.create_cost_item(item)
# Test filtering by active only
active_items = cost_manager.list_cost_items(active_only=True)
assert len(active_items) == 2
assert all(item['is_active'] == 1 for item in active_items)
# Test filtering by cost type
monthly_items = cost_manager.list_cost_items(cost_type="monthly")
assert len(monthly_items) == 1 # Only active monthly items
assert monthly_items[0]['cost_type'] == "monthly"
# Test including inactive items
all_items = cost_manager.list_cost_items(active_only=False)
assert len(all_items) == 3
def test_get_active_costs_for_period(self, cost_manager, sample_category_id):
"""Test retrieving active costs for specific period."""
# Create cost items with different date ranges
items = [
CostItem(
category_id=sample_category_id,
name="Active Throughout",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2024, 12, 1)
),
CostItem(
category_id=sample_category_id,
name="Starts Mid-Period",
cost_type="monthly",
amount_eur=Decimal('15.00'),
starting_from_date=date(2025, 1, 15)
),
CostItem(
category_id=sample_category_id,
name="Ends Mid-Period",
cost_type="monthly",
amount_eur=Decimal('20.00'),
starting_from_date=date(2024, 12, 1),
ending_date=date(2025, 1, 15)
),
CostItem(
category_id=sample_category_id,
name="Outside Period",
cost_type="monthly",
amount_eur=Decimal('25.00'),
starting_from_date=date(2025, 2, 1)
)
]
for item in items:
cost_manager.create_cost_item(item)
# Get active costs for January 2025
period_start = date(2025, 1, 1)
period_end = date(2025, 1, 31)
active_costs = cost_manager.get_active_costs_for_period(period_start, period_end)
# Should include first 3 items but not the fourth
assert len(active_costs) == 3
names = [item['name'] for item in active_costs]
assert "Active Throughout" in names
assert "Starts Mid-Period" in names
assert "Ends Mid-Period" in names
assert "Outside Period" not in names
def test_calculate_period_costs(self, cost_manager, sample_category_id):
"""Test period cost calculations."""
# Create another category
other_category_id = cost_manager.create_category("Other Category")
# Create cost items in different categories
items = [
CostItem(
category_id=sample_category_id,
name="Monthly Cost 1",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
),
CostItem(
category_id=sample_category_id,
name="Monthly Cost 2",
cost_type="monthly",
amount_eur=Decimal('15.00'),
starting_from_date=date(2025, 1, 1)
),
CostItem(
category_id=other_category_id,
name="One-time Cost",
cost_type="one_time",
amount_eur=Decimal('100.00'),
starting_from_date=date(2025, 1, 1)
)
]
for item in items:
cost_manager.create_cost_item(item)
# Calculate costs for January 2025
period_start = date(2025, 1, 1)
period_end = date(2025, 1, 31)
calculations = cost_manager.calculate_period_costs(period_start, period_end)
assert calculations['total_monthly'] == 25.00
assert calculations['total_one_time'] == 100.00
assert calculations['total_period'] == 125.00
assert calculations['active_cost_items'] == 3
# Check category breakdown
assert 'Test Category' in calculations['category_breakdown']
assert 'Other Category' in calculations['category_breakdown']
assert calculations['category_breakdown']['Test Category']['monthly'] == 25.00
assert calculations['category_breakdown']['Other Category']['one_time'] == 100.00
def test_category_management(self, cost_manager):
"""Test category creation and management."""
# Create category with unique name
category_id = cost_manager.create_category("Custom Infrastructure", "Custom server costs")
assert category_id is not None
# Retrieve category
category = cost_manager.get_category(category_id)
assert category['name'] == "Custom Infrastructure"
assert category['description'] == "Custom server costs"
# Test duplicate category
with pytest.raises(ValueError, match="already exists"):
cost_manager.create_category("Custom Infrastructure")
# List categories
categories = cost_manager.list_categories()
category_names = [cat['name'] for cat in categories]
assert "Custom Infrastructure" in category_names
# Should also include default categories from schema initialization
assert len(categories) >= 9 # 8 default + 1 created
# Get category by name
found_category = cost_manager.get_category_by_name("Custom Infrastructure")
assert found_category['id'] == category_id
def test_cost_item_with_category_validation(self, cost_manager):
"""Test cost item creation with category validation."""
# Try to create cost item with non-existent category
with pytest.raises(ValueError, match="does not exist"):
cost_item = CostItem(
category_id=99999,
name="Test Item",
cost_type="monthly",
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
)
cost_manager.create_cost_item(cost_item)
def test_precision_handling(self, cost_manager, sample_category_id):
"""Test decimal precision in cost calculations."""
# Create cost item with precise decimal
cost_item = CostItem(
category_id=sample_category_id,
name="Precise Cost",
cost_type="monthly",
amount_eur=Decimal('10.99'),
starting_from_date=date(2025, 1, 1)
)
cost_item_id = cost_manager.create_cost_item(cost_item)
# Verify precision is maintained
retrieved = cost_manager.get_cost_item(cost_item_id)
assert float(retrieved['amount_eur']) == 10.99
# Test in period calculations
calculations = cost_manager.calculate_period_costs(date(2025, 1, 1), date(2025, 1, 31))
assert calculations['total_monthly'] == 10.99
def test_empty_database_operations(self, cost_manager):
"""Test operations on empty database."""
# List items in empty database
items = cost_manager.list_cost_items()
assert len(items) == 0
# Get non-existent item
item = cost_manager.get_cost_item(99999)
assert item is None
# Calculate costs for empty period
calculations = cost_manager.calculate_period_costs(date(2025, 1, 1), date(2025, 1, 31))
assert calculations['total_monthly'] == 0.00
assert calculations['total_one_time'] == 0.00
assert calculations['active_cost_items'] == 0

View File

@@ -0,0 +1,357 @@
"""
Tests for MarkiTect cost report template generator.
This module tests the complete cost report generation functionality including:
- Report generation in different formats (summary, detailed, audit)
- Markdown output with frontmatter and contentmatter
- CLI integration and command functionality
- Template structure validation
"""
import pytest
import tempfile
import os
import json
from datetime import date, datetime
from decimal import Decimal
from markitect.finance.cost_manager import CostItemManager, CostItem
from markitect.finance.report_generator import CostReportGenerator, ReportConfig
from markitect.finance.models import FinanceModels
class TestCostReportGenerator:
"""Test suite for cost report generation system."""
@pytest.fixture
def temp_db(self):
"""Create temporary database for testing."""
fd, path = tempfile.mkstemp(suffix='.db')
os.close(fd)
yield path
os.unlink(path)
@pytest.fixture
def setup_test_data(self, temp_db):
"""Setup test database with sample cost data."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
cost_manager = CostItemManager(temp_db)
# Get categories
infra_cat = cost_manager.get_category_by_name('Infrastructure')
software_cat = cost_manager.get_category_by_name('Software')
# Create sample cost items
cost_items = [
CostItem(
category_id=infra_cat['id'],
name='Hosteurope Server',
description='Monthly server hosting',
cost_type='monthly',
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
),
CostItem(
category_id=software_cat['id'],
name='Bubble.io Plan',
description='No-code platform subscription',
cost_type='monthly',
amount_eur=Decimal('32.00'),
starting_from_date=date(2025, 1, 1)
),
CostItem(
category_id=infra_cat['id'],
name='SSL Certificate',
description='Annual SSL certificate',
cost_type='one_time',
amount_eur=Decimal('45.00'),
starting_from_date=date(2025, 1, 15)
)
]
for item in cost_items:
cost_manager.create_cost_item(item)
return temp_db
@pytest.fixture
def report_generator(self, setup_test_data):
"""Create report generator with test data."""
return CostReportGenerator(setup_test_data)
def test_report_config_creation(self):
"""Test ReportConfig dataclass creation."""
config = ReportConfig(
format="summary",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31),
currency="EUR"
)
assert config.format == "summary"
assert config.period_start == date(2025, 1, 1)
assert config.period_end == date(2025, 1, 31)
assert config.currency == "EUR"
assert config.include_inactive is False
assert config.output_path is None
def test_generate_summary_report(self, report_generator):
"""Test generation of summary cost report."""
config = ReportConfig(
format="summary",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
report = report_generator.generate_report(config)
# Check that it's valid markdown with frontmatter
assert report.startswith("---")
assert "Cost Summary Report - January 2025" in report
assert "total_costs: 87.0" in report
assert "report_type: \"cost_summary\"" in report
# Check contentmatter is present
assert "contentmatter:" in report
assert "cost_data" in report
# Verify total costs are correct (10 + 32 + 45 = 87)
assert "€87.00" in report
def test_generate_detailed_report(self, report_generator):
"""Test generation of detailed cost report."""
config = ReportConfig(
format="detailed",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
report = report_generator.generate_report(config)
# Check report structure
assert "Detailed Cost Report - January 2025" in report
assert "Executive Summary" in report
assert "report_type: \"cost_detailed\"" in report
# Check category sections are present
assert "Infrastructure" in report
assert "Software" in report
# Check individual items are listed
assert "Hosteurope Server" in report
assert "Bubble.io Plan" in report
assert "SSL Certificate" in report
# Check table format
assert "| Name | Type | Amount | Status | Start Date |" in report
def test_generate_audit_report(self, report_generator):
"""Test generation of audit trail report."""
config = ReportConfig(
format="audit",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
report = report_generator.generate_report(config)
# Check report structure
assert "Cost Audit Report - January 2025" in report
assert "Audit Summary" in report
assert "report_type: \"cost_audit\"" in report
assert "audit_trail: True" in report
# Check audit sections
assert "Cost Verification" in report
assert "Active Cost Items" in report
assert "Transaction History" in report
assert "Audit Trail" in report
# Check contentmatter includes audit data
assert "audit_data" in report
def test_generate_period_report_convenience_method(self, report_generator):
"""Test convenience method for generating monthly reports."""
report = report_generator.generate_period_report(2025, 1, "summary")
assert "Cost Summary Report - January 2025" in report
assert "2025-01-01" in report
assert "2025-01-31" in report
def test_invalid_report_format_raises_error(self, report_generator):
"""Test that invalid report format raises ValueError."""
config = ReportConfig(
format="invalid",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
with pytest.raises(ValueError, match="Unknown report format"):
report_generator.generate_report(config)
def test_frontmatter_structure(self, report_generator):
"""Test frontmatter structure in generated reports."""
config = ReportConfig(
format="summary",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
report = report_generator.generate_report(config)
# Extract frontmatter (between first two ---)
lines = report.split('\n')
frontmatter_lines = []
in_frontmatter = False
for line in lines:
if line.strip() == "---":
if not in_frontmatter:
in_frontmatter = True
continue
else:
break
if in_frontmatter:
frontmatter_lines.append(line)
frontmatter_text = '\n'.join(frontmatter_lines)
# Check required frontmatter fields
assert 'report_type:' in frontmatter_text
assert 'period_start:' in frontmatter_text
assert 'period_end:' in frontmatter_text
assert 'total_costs:' in frontmatter_text
assert 'currency:' in frontmatter_text
assert 'generated_at:' in frontmatter_text
def test_contentmatter_structure(self, report_generator):
"""Test contentmatter structure in generated reports."""
config = ReportConfig(
format="summary",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
report = report_generator.generate_report(config)
# Extract contentmatter (JSON in HTML comment)
assert "<!--" in report
assert "contentmatter:" in report
assert "-->" in report
# Find and extract JSON
start = report.find("contentmatter:\n") + len("contentmatter:\n")
end = report.find("\n-->")
json_text = report[start:end].strip()
# Parse JSON to verify structure
contentmatter = json.loads(json_text)
assert "cost_data" in contentmatter
assert "total_monthly" in contentmatter["cost_data"]
assert "total_one_time" in contentmatter["cost_data"]
assert "categories" in contentmatter["cost_data"]
assert "active_items" in contentmatter["cost_data"]
# Verify totals
assert contentmatter["cost_data"]["total_monthly"] == 42.0
assert contentmatter["cost_data"]["total_one_time"] == 45.0
def test_save_report_to_file(self, report_generator, temp_db):
"""Test saving report to file."""
config = ReportConfig(
format="summary",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
report = report_generator.generate_report(config)
# Save to temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.md') as f:
output_path = f.name
try:
report_generator.save_report(report, output_path)
# Verify file was created and contains expected content
with open(output_path, 'r', encoding='utf-8') as f:
saved_content = f.read()
assert saved_content == report
assert "Cost Summary Report" in saved_content
finally:
os.unlink(output_path)
def test_empty_database_report(self, temp_db):
"""Test report generation with empty database."""
# Initialize empty database
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
report_generator = CostReportGenerator(temp_db)
config = ReportConfig(
format="summary",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
report = report_generator.generate_report(config)
# Should still generate valid report with zero costs
assert "total_costs: 0.0" in report
assert "€0.00" in report
def test_different_currency(self, report_generator):
"""Test report generation with different currency."""
config = ReportConfig(
format="summary",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31),
currency="USD"
)
report = report_generator.generate_report(config)
assert 'currency: "USD"' in report
# Note: amounts are still in EUR from database, currency is just metadata
def test_report_with_inactive_items(self, setup_test_data):
"""Test report behavior with inactive cost items."""
cost_manager = CostItemManager(setup_test_data)
# Deactivate one item
items = cost_manager.list_cost_items()
if items:
cost_manager.deactivate_cost_item(items[0]['id'], date(2025, 1, 15))
report_generator = CostReportGenerator(setup_test_data)
config = ReportConfig(
format="detailed",
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31),
include_inactive=False
)
report = report_generator.generate_report(config)
# Should still generate valid report, potentially with fewer active items
assert "Detailed Cost Report" in report
assert "contentmatter:" in report
def test_cross_month_period(self, report_generator):
"""Test report generation across multiple months."""
config = ReportConfig(
format="summary",
period_start=date(2025, 1, 15),
period_end=date(2025, 2, 15)
)
report = report_generator.generate_report(config)
assert "2025-01-15" in report
assert "2025-02-15" in report
# Should include items active during this period

View File

@@ -0,0 +1,430 @@
"""
Tests for MarkiTect finance models and database schema.
This module tests the complete finance schema including:
- Database table creation and relationships
- Data integrity constraints
- Index performance
- Schema validation
- Migration functionality
"""
import pytest
import tempfile
import os
from datetime import date, datetime
from decimal import Decimal
from markitect.finance.models import FinanceModels
class TestFinanceModels:
"""Test suite for finance database models."""
@pytest.fixture
def temp_db(self):
"""Create temporary database for testing."""
fd, path = tempfile.mkstemp(suffix='.db')
os.close(fd)
yield path
os.unlink(path)
@pytest.fixture
def finance_models(self, temp_db):
"""Create FinanceModels instance with temporary database."""
return FinanceModels(temp_db)
def test_initialize_finance_schema(self, finance_models):
"""Test complete finance schema initialization."""
# Initialize schema
finance_models.initialize_finance_schema()
# Validate schema was created
assert finance_models.validate_schema()
# Check all required tables exist
schema_info = finance_models.get_schema_info()
expected_tables = [
'cost_categories',
'cost_items',
'cost_periods',
'cost_transactions',
'issue_cost_allocations',
'issue_activity_log'
]
for table in expected_tables:
assert table in schema_info['tables']
assert len(schema_info['tables'][table]['columns']) > 0
def test_cost_categories_table(self, finance_models):
"""Test cost categories table structure and data."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Test default categories were inserted
cursor.execute('SELECT COUNT(*) FROM cost_categories')
count = cursor.fetchone()[0]
assert count >= 8 # At least 8 default categories
# Test unique constraint
with pytest.raises(Exception): # Should violate unique constraint
cursor.execute('''
INSERT INTO cost_categories (name, description)
VALUES ('Infrastructure', 'Duplicate category')
''')
conn.close()
def test_cost_items_table(self, finance_models):
"""Test cost items table constraints and relationships."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Insert test category
cursor.execute('''
INSERT INTO cost_categories (name, description)
VALUES ('Test Category', 'For testing')
''')
category_id = cursor.lastrowid
# Test valid cost item insertion
cursor.execute('''
INSERT INTO cost_items
(category_id, name, cost_type, amount_eur, starting_from_date)
VALUES (?, 'Test Server', 'monthly', 10.50, '2025-01-01')
''', (category_id,))
# Test cost_type constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_items
(category_id, name, cost_type, amount_eur, starting_from_date)
VALUES (?, 'Invalid Type', 'invalid', 10.00, '2025-01-01')
''', (category_id,))
# Test negative amount constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_items
(category_id, name, cost_type, amount_eur, starting_from_date)
VALUES (?, 'Negative Cost', 'monthly', -10.00, '2025-01-01')
''', (category_id,))
# Test date range constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_items
(category_id, name, cost_type, amount_eur, starting_from_date, ending_date)
VALUES (?, 'Invalid Dates', 'monthly', 10.00, '2025-01-01', '2024-12-31')
''', (category_id,))
conn.close()
def test_cost_periods_table(self, finance_models):
"""Test cost periods table constraints."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Test valid period insertion
cursor.execute('''
INSERT INTO cost_periods (period_start, period_end)
VALUES ('2025-01-01', '2025-01-31')
''')
# Test period date constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_periods (period_start, period_end)
VALUES ('2025-01-31', '2025-01-01')
''')
# Test status constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_periods (period_start, period_end, status)
VALUES ('2025-02-01', '2025-02-28', 'invalid_status')
''')
# Test unique period constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_periods (period_start, period_end)
VALUES ('2025-01-01', '2025-01-31')
''')
conn.close()
def test_cost_transactions_table(self, finance_models):
"""Test cost transactions table and audit trail."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Create test data
cursor.execute('''
INSERT INTO cost_categories (name) VALUES ('Test Category')
''')
category_id = cursor.lastrowid
cursor.execute('''
INSERT INTO cost_items
(category_id, name, cost_type, amount_eur, starting_from_date)
VALUES (?, 'Test Item', 'monthly', 10.00, '2025-01-01')
''', (category_id,))
cost_item_id = cursor.lastrowid
cursor.execute('''
INSERT INTO cost_periods (period_start, period_end)
VALUES ('2025-01-01', '2025-01-31')
''')
period_id = cursor.lastrowid
# Test valid transaction
cursor.execute('''
INSERT INTO cost_transactions
(period_id, cost_item_id, transaction_type, amount_eur, transaction_date)
VALUES (?, ?, 'cost_incurred', 10.00, '2025-01-15')
''', (period_id, cost_item_id))
# Test transaction type constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_transactions
(period_id, cost_item_id, transaction_type, amount_eur, transaction_date)
VALUES (?, ?, 'invalid_type', 10.00, '2025-01-15')
''', (period_id, cost_item_id))
conn.close()
def test_issue_cost_allocations_table(self, finance_models):
"""Test issue cost allocations table."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Create test period
cursor.execute('''
INSERT INTO cost_periods (period_start, period_end)
VALUES ('2025-01-01', '2025-01-31')
''')
period_id = cursor.lastrowid
# Test valid allocation
cursor.execute('''
INSERT INTO issue_cost_allocations
(issue_id, period_id, allocated_amount, allocation_date)
VALUES (123, ?, 5.50, '2025-01-31')
''', (period_id,))
# Test positive amount constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO issue_cost_allocations
(issue_id, period_id, allocated_amount, allocation_date)
VALUES (124, ?, -1.00, '2025-01-31')
''', (period_id,))
# Test unique issue-period constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO issue_cost_allocations
(issue_id, period_id, allocated_amount, allocation_date)
VALUES (123, ?, 3.00, '2025-01-31')
''', (period_id,))
conn.close()
def test_issue_activity_log_table(self, finance_models):
"""Test issue activity log table."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Test valid activity log entry
cursor.execute('''
INSERT INTO issue_activity_log
(issue_id, activity_type, activity_date)
VALUES (123, 'created', '2025-01-15')
''')
# Test activity type constraint
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO issue_activity_log
(issue_id, activity_type, activity_date)
VALUES (124, 'invalid_activity', '2025-01-15')
''')
conn.close()
def test_foreign_key_constraints(self, finance_models):
"""Test foreign key relationships are enforced."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Test cost_items references cost_categories
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_items
(category_id, name, cost_type, amount_eur, starting_from_date)
VALUES (999, 'Invalid Category', 'monthly', 10.00, '2025-01-01')
''')
# Test cost_transactions references cost_periods
with pytest.raises(Exception):
cursor.execute('''
INSERT INTO cost_transactions
(period_id, transaction_type, amount_eur, transaction_date)
VALUES (999, 'cost_incurred', 10.00, '2025-01-15')
''')
conn.close()
def test_indexes_created(self, finance_models):
"""Test that performance indexes are created."""
finance_models.initialize_finance_schema()
schema_info = finance_models.get_schema_info()
index_names = [idx['name'] for idx in schema_info['indexes']]
# Check critical indexes exist
expected_indexes = [
'idx_cost_items_active',
'idx_cost_items_type',
'idx_cost_periods_status',
'idx_cost_transactions_period',
'idx_issue_allocations_issue'
]
for index in expected_indexes:
assert index in index_names
def test_schema_validation(self, finance_models):
"""Test schema validation functionality."""
# Before initialization
assert not finance_models.validate_schema()
# After initialization
finance_models.initialize_finance_schema()
assert finance_models.validate_schema()
def test_drop_finance_schema(self, finance_models):
"""Test schema cleanup functionality."""
# Initialize schema
finance_models.initialize_finance_schema()
assert finance_models.validate_schema()
# Drop schema
finance_models.drop_finance_schema()
assert not finance_models.validate_schema()
def test_database_integration(self, temp_db):
"""Test integration with existing DatabaseManager."""
from markitect.database import DatabaseManager
# Initialize standard database
db_manager = DatabaseManager(temp_db)
db_manager.initialize_database()
# Verify finance tables were also created
finance_models = FinanceModels(temp_db)
assert finance_models.validate_schema()
# Verify existing tables still exist
conn = finance_models.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT name FROM sqlite_master
WHERE type='table' AND name IN ('markdown_files', 'schemas')
''')
existing_tables = [row[0] for row in cursor.fetchall()]
assert 'markdown_files' in existing_tables
assert 'schemas' in existing_tables
conn.close()
def test_decimal_precision(self, finance_models):
"""Test decimal precision for financial calculations."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Insert test category
cursor.execute('''
INSERT INTO cost_categories (name) VALUES ('Test Category')
''')
category_id = cursor.lastrowid
# Test precise decimal amounts
test_amounts = [10.50, 99.99, 0.01, 1234.56]
for amount in test_amounts:
cursor.execute('''
INSERT INTO cost_items
(category_id, name, cost_type, amount_eur, starting_from_date)
VALUES (?, ?, 'monthly', ?, '2025-01-01')
''', (category_id, f'Test Item {amount}', amount))
# Verify precision is maintained
cursor.execute('SELECT amount_eur FROM cost_items ORDER BY id')
stored_amounts = [float(row[0]) for row in cursor.fetchall()]
assert stored_amounts == test_amounts
conn.close()
def test_example_cost_data(self, finance_models):
"""Test insertion of example cost data from issue description."""
finance_models.initialize_finance_schema()
conn = finance_models.get_connection()
cursor = conn.cursor()
# Get category IDs
cursor.execute('SELECT id, name FROM cost_categories')
categories = {name: id for id, name in cursor.fetchall()}
# Insert example costs from issue #88
example_costs = [
('Infrastructure', 'Hosteurope Server', 'Monthly server hosting', 10.00),
('Software', 'Bubble.io Plan', 'No-code platform subscription', 32.00),
('Domain & DNS', 'Coulomb.social Domain', 'Domain registration', 5.00),
('Development Tools', 'Claude Code Plan', 'AI coding assistant', 20.00),
('AI & ML Services', 'Gemini Plan', 'LLM API for specifications', 20.00)
]
for category_name, name, description, amount in example_costs:
category_id = categories.get(category_name)
assert category_id is not None
cursor.execute('''
INSERT INTO cost_items
(category_id, name, description, cost_type, amount_eur, starting_from_date)
VALUES (?, ?, ?, 'monthly', ?, '2025-01-01')
''', (category_id, name, description, amount))
# Verify total monthly costs
cursor.execute('''
SELECT SUM(amount_eur) FROM cost_items
WHERE cost_type = 'monthly' AND is_active = TRUE
''')
total_monthly = float(cursor.fetchone()[0])
assert total_monthly == 87.00 # €87/month as described in issue
conn.close()

View File

@@ -0,0 +1,794 @@
"""
Tests for Issue #122 - Daily worktime estimation and distribution of associated cost
This module contains comprehensive tests for the worktime tracking system
that estimates daily work time and distributes costs proportionally based
on time allocation across issues.
"""
import pytest
import sqlite3
from datetime import datetime, date, timedelta
from unittest.mock import Mock, patch, MagicMock
from pathlib import Path
import tempfile
import json
from decimal import Decimal
from markitect.finance.worktime_tracker import WorktimeTracker, WorktimeEntry, DailySummary
from markitect.finance.worktime_commands import worktime, _parse_duration, _format_duration
class TestWorktimeEntry:
"""Test suite for WorktimeEntry dataclass."""
def test_worktime_entry_creation(self):
"""Test that WorktimeEntry objects can be created properly."""
entry = WorktimeEntry(
id=1,
issue_id=122,
work_date=date.today(),
duration_minutes=90,
description="Working on worktime tracking"
)
assert entry.id == 1
assert entry.issue_id == 122
assert entry.work_date == date.today()
assert entry.duration_minutes == 90
assert entry.description == "Working on worktime tracking"
def test_worktime_entry_defaults(self):
"""Test that WorktimeEntry has proper default values."""
entry = WorktimeEntry()
assert entry.id is None
assert entry.issue_id is None
assert entry.work_date is None
assert entry.start_time is None
assert entry.end_time is None
assert entry.duration_minutes is None
assert entry.description is None
assert entry.entry_type == "manual"
assert entry.created_at is None
assert entry.updated_at is None
class TestDailySummary:
"""Test suite for DailySummary dataclass."""
def test_daily_summary_creation(self):
"""Test that DailySummary objects can be created properly."""
entries = [
WorktimeEntry(id=1, issue_id=122, duration_minutes=90),
WorktimeEntry(id=2, issue_id=123, duration_minutes=60)
]
summary = DailySummary(
work_date=date.today(),
total_minutes=150,
issue_count=2,
entries=entries,
cost_per_minute=Decimal('0.1'),
total_cost_allocated=Decimal('15.0')
)
assert summary.work_date == date.today()
assert summary.total_minutes == 150
assert summary.issue_count == 2
assert len(summary.entries) == 2
assert summary.cost_per_minute == Decimal('0.1')
assert summary.total_cost_allocated == Decimal('15.0')
class TestWorktimeTracker:
"""Test suite for WorktimeTracker service."""
def setup_method(self):
"""Set up test fixtures with temporary database."""
self.temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
self.temp_db.close()
self.db_path = self.temp_db.name
self.tracker = WorktimeTracker(self.db_path)
def teardown_method(self):
"""Clean up test fixtures."""
Path(self.db_path).unlink(missing_ok=True)
def test_tracker_initialization(self):
"""Test that tracker initializes properly with database."""
assert self.tracker.db_path == self.db_path
assert self.tracker.finance_models is not None
# Verify worktime tables were created
with self.tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [row[0] for row in cursor.fetchall()]
expected_tables = ['worktime_entries', 'daily_worktime_summaries', 'worktime_cost_distributions']
for table in expected_tables:
assert table in tables
def test_log_worktime_basic(self):
"""Test logging basic worktime entry."""
entry_id = self.tracker.log_worktime(
issue_id=122,
duration_minutes=90,
description="Implementing worktime tracking"
)
assert entry_id is not None
# Verify entry was stored
entries = self.tracker.get_worktime_entries(issue_id=122)
assert len(entries) == 1
assert entries[0].issue_id == 122
assert entries[0].duration_minutes == 90
assert entries[0].description == "Implementing worktime tracking"
def test_log_worktime_with_timestamps(self):
"""Test logging worktime with start and end times."""
now = datetime.now()
start_time = now.replace(hour=9, minute=0, second=0, microsecond=0)
end_time = now.replace(hour=10, minute=30, second=0, microsecond=0)
entry_id = self.tracker.log_worktime(
issue_id=122,
duration_minutes=90,
start_time=start_time,
end_time=end_time,
description="Morning work session"
)
entries = self.tracker.get_worktime_entries(issue_id=122)
assert len(entries) == 1
assert entries[0].start_time.hour == 9
assert entries[0].end_time.hour == 10
assert entries[0].end_time.minute == 30
def test_log_worktime_validation(self):
"""Test worktime logging validation."""
# Test negative duration
with pytest.raises(ValueError, match="Duration must be positive"):
self.tracker.log_worktime(issue_id=122, duration_minutes=-30)
# Test zero duration
with pytest.raises(ValueError, match="Duration must be positive"):
self.tracker.log_worktime(issue_id=122, duration_minutes=0)
def test_get_worktime_entries_filtering(self):
"""Test worktime entry retrieval with various filters."""
today = date.today()
yesterday = today - timedelta(days=1)
# Create test entries
self.tracker.log_worktime(122, 60, work_date=today, description="Today's work")
self.tracker.log_worktime(123, 90, work_date=today, description="Today's other work")
self.tracker.log_worktime(122, 45, work_date=yesterday, description="Yesterday's work")
# Test filtering by issue
issue_122_entries = self.tracker.get_worktime_entries(issue_id=122)
assert len(issue_122_entries) == 2
assert all(e.issue_id == 122 for e in issue_122_entries)
# Test filtering by date
today_entries = self.tracker.get_worktime_entries(work_date=today)
assert len(today_entries) == 2
assert all(e.work_date == today for e in today_entries)
# Test date range filtering
range_entries = self.tracker.get_worktime_entries(start_date=yesterday, end_date=today)
assert len(range_entries) == 3
def test_get_daily_summary(self):
"""Test daily worktime summary generation."""
today = date.today()
# Log multiple entries for today
self.tracker.log_worktime(122, 90, work_date=today)
self.tracker.log_worktime(123, 60, work_date=today)
self.tracker.log_worktime(122, 30, work_date=today) # Second entry for same issue
summary = self.tracker.get_daily_summary(today)
assert summary is not None
assert summary.work_date == today
assert summary.total_minutes == 180 # 90 + 60 + 30
assert summary.issue_count == 2 # Issues 122 and 123
assert len(summary.entries) == 3
def test_estimate_daily_worktime_equal_distribution(self):
"""Test daily worktime estimation with equal distribution."""
today = date.today()
issues = [122, 123, 124]
result = self.tracker.estimate_daily_worktime(
work_date=today,
total_hours=6.0,
issues=issues,
distribution_method="equal"
)
assert result['work_date'] == today
assert result['total_minutes'] == 360 # 6 hours
assert result['distribution_method'] == "equal"
assert result['issues_count'] == 3
# Each issue should get 120 minutes (2 hours)
for issue_id in issues:
assert result['issue_estimates'][issue_id] == 120
# Verify entries were created
entries = self.tracker.get_worktime_entries(work_date=today)
assert len(entries) == 3
assert all(e.entry_type == "estimated" for e in entries)
def test_estimate_daily_worktime_activity_based(self):
"""Test daily worktime estimation with activity-based distribution."""
today = date.today()
# Mock activity data - issue 122 has more activities
with patch.object(self.tracker, '_get_activity_weights_for_date') as mock_weights:
mock_weights.return_value = {122: 5, 123: 2, 124: 1} # Different activity levels
result = self.tracker.estimate_daily_worktime(
work_date=today,
total_hours=8.0,
issues=[122, 123, 124],
distribution_method="activity_based"
)
# Verify distribution is proportional to activities
total_weight = 5 + 2 + 1 # 8
expected_122 = int((5/8) * 480) # 300 minutes
expected_123 = int((2/8) * 480) # 120 minutes
expected_124 = int((1/8) * 480) # 60 minutes
assert result['issue_estimates'][122] == expected_122
assert result['issue_estimates'][123] == expected_123
assert result['issue_estimates'][124] == expected_124
def test_distribute_daily_costs(self):
"""Test daily cost distribution based on time allocation."""
today = date.today()
# Log different amounts of time for different issues
self.tracker.log_worktime(122, 120, work_date=today) # 2 hours
self.tracker.log_worktime(123, 60, work_date=today) # 1 hour
self.tracker.log_worktime(124, 120, work_date=today) # 2 hours
# Total: 5 hours (300 minutes)
total_cost = Decimal('150.00') # €150 for the day
result = self.tracker.distribute_daily_costs(
work_date=today,
total_daily_cost=total_cost
)
assert result['work_date'] == today
assert result['total_cost'] == 150.0
assert result['total_minutes'] == 300
assert result['cost_per_minute'] == 0.5 # €150 / 300 minutes
# Check cost distribution
assert result['distributions'][122]['cost_allocated'] == 60.0 # 120 min * €0.5
assert result['distributions'][123]['cost_allocated'] == 30.0 # 60 min * €0.5
assert result['distributions'][124]['cost_allocated'] == 60.0 # 120 min * €0.5
# Check percentages
assert result['distributions'][122]['percentage'] == 40.0 # 120/300 * 100
assert result['distributions'][123]['percentage'] == 20.0 # 60/300 * 100
assert result['distributions'][124]['percentage'] == 40.0 # 120/300 * 100
def test_distribute_daily_costs_no_worktime(self):
"""Test cost distribution when no worktime is logged."""
today = date.today()
total_cost = Decimal('100.00')
result = self.tracker.distribute_daily_costs(
work_date=today,
total_daily_cost=total_cost
)
assert 'message' in result
assert "No worktime entries found" in result['message']
def test_get_worktime_report(self):
"""Test comprehensive worktime reporting."""
today = date.today()
yesterday = today - timedelta(days=1)
# Create test data across multiple days and issues
self.tracker.log_worktime(122, 90, work_date=yesterday)
self.tracker.log_worktime(123, 60, work_date=yesterday)
self.tracker.log_worktime(122, 120, work_date=today)
self.tracker.log_worktime(124, 45, work_date=today)
report = self.tracker.get_worktime_report(
start_date=yesterday,
end_date=today
)
assert report['total_entries'] == 4
assert report['total_time']['total_minutes'] == 315 # 90+60+120+45
assert report['total_time']['hours'] == 5
assert report['total_time']['minutes'] == 15
assert report['unique_issues'] == 3 # Issues 122, 123, 124
assert report['unique_dates'] == 2
# Check issue breakdown
assert 122 in report['issue_breakdown']
assert report['issue_breakdown'][122]['total_minutes'] == 210 # 90+120
assert report['issue_breakdown'][122]['entry_count'] == 2
assert report['issue_breakdown'][122]['unique_dates'] == 2
def test_delete_worktime_entry(self):
"""Test deleting worktime entries."""
entry_id = self.tracker.log_worktime(122, 90, description="Test entry")
# Verify entry exists
entries = self.tracker.get_worktime_entries(issue_id=122)
assert len(entries) == 1
# Delete entry
success = self.tracker.delete_worktime_entry(entry_id)
assert success is True
# Verify entry is gone
entries = self.tracker.get_worktime_entries(issue_id=122)
assert len(entries) == 0
# Try to delete non-existent entry
success = self.tracker.delete_worktime_entry(99999)
assert success is False
def test_update_worktime_entry(self):
"""Test updating worktime entries."""
entry_id = self.tracker.log_worktime(122, 90, description="Original description")
# Update duration and description
success = self.tracker.update_worktime_entry(
entry_id=entry_id,
duration_minutes=120,
description="Updated description"
)
assert success is True
# Verify updates
entries = self.tracker.get_worktime_entries(issue_id=122)
assert len(entries) == 1
assert entries[0].duration_minutes == 120
assert entries[0].description == "Updated description"
# Try to update non-existent entry
success = self.tracker.update_worktime_entry(
entry_id=99999,
duration_minutes=60
)
assert success is False
class TestWorktimeCommands:
"""Test suite for worktime CLI commands."""
def test_parse_duration_minutes(self):
"""Test parsing duration strings - minutes format."""
assert _parse_duration("90") == 90
assert _parse_duration("120") == 120
assert _parse_duration("45m") == 45
def test_parse_duration_hours(self):
"""Test parsing duration strings - hours format."""
assert _parse_duration("1h") == 60
assert _parse_duration("2h") == 120
assert _parse_duration("1.5h") == 90
assert _parse_duration("2.25h") == 135
def test_parse_duration_hours_minutes(self):
"""Test parsing duration strings - hours and minutes format."""
assert _parse_duration("1h30m") == 90
assert _parse_duration("2h15m") == 135
assert _parse_duration("0h45m") == 45
assert _parse_duration("3h0m") == 180
def test_parse_duration_invalid(self):
"""Test parsing invalid duration strings."""
with pytest.raises(ValueError):
_parse_duration("invalid")
with pytest.raises(ValueError):
_parse_duration("1x30m")
with pytest.raises(ValueError):
_parse_duration("")
def test_format_duration_minutes_only(self):
"""Test formatting duration - minutes only."""
assert _format_duration(30) == "30m"
assert _format_duration(45) == "45m"
assert _format_duration(59) == "59m"
def test_format_duration_hours_only(self):
"""Test formatting duration - hours only."""
assert _format_duration(60) == "1h"
assert _format_duration(120) == "2h"
assert _format_duration(180) == "3h"
def test_format_duration_hours_and_minutes(self):
"""Test formatting duration - hours and minutes."""
assert _format_duration(90) == "1h30m"
assert _format_duration(135) == "2h15m"
assert _format_duration(195) == "3h15m"
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_log_command_basic(self, mock_tracker_class):
"""Test the log command with basic parameters."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_tracker.log_worktime.return_value = 1
mock_tracker.get_daily_summary.return_value = DailySummary(
work_date=date.today(),
total_minutes=90,
issue_count=1,
entries=[]
)
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(worktime, ['log', '122', '1h30m'])
assert result.exit_code == 0
assert "✅ Logged 90min worktime for issue #122" in result.output
mock_tracker.log_worktime.assert_called_once()
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_log_command_with_description(self, mock_tracker_class):
"""Test the log command with description."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_tracker.log_worktime.return_value = 1
mock_tracker.get_daily_summary.return_value = None
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(worktime, ['log', '122', '90', '--description', 'Testing worktime'])
assert result.exit_code == 0
assert "Testing worktime" in result.output
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_list_command_table_format(self, mock_tracker_class):
"""Test the list command with table output format."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_entries = [
WorktimeEntry(
id=1,
issue_id=122,
work_date=date.today(),
duration_minutes=90,
description="Test worktime",
entry_type="manual"
)
]
mock_tracker.get_worktime_entries.return_value = mock_entries
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(worktime, ['list'])
assert result.exit_code == 0
assert "⏰ Worktime Entries" in result.output
assert "#122" in result.output
assert "1h30m" in result.output
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_list_command_json_format(self, mock_tracker_class):
"""Test the list command with JSON output format."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_entries = [
WorktimeEntry(
id=1,
issue_id=122,
work_date=date.today(),
duration_minutes=90,
description="Test worktime",
entry_type="manual"
)
]
mock_tracker.get_worktime_entries.return_value = mock_entries
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(worktime, ['list', '--format', 'json'])
assert result.exit_code == 0
# Should be valid JSON
output_data = json.loads(result.output.strip())
assert len(output_data) == 1
assert output_data[0]['issue_id'] == 122
assert output_data[0]['duration_minutes'] == 90
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_daily_command(self, mock_tracker_class):
"""Test the daily summary command."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_entries = [
WorktimeEntry(id=1, issue_id=122, duration_minutes=90, entry_type="manual"),
WorktimeEntry(id=2, issue_id=123, duration_minutes=60, entry_type="manual")
]
mock_summary = DailySummary(
work_date=date.today(),
total_minutes=150,
issue_count=2,
entries=mock_entries,
cost_per_minute=Decimal('0.5'),
total_cost_allocated=Decimal('75.0')
)
mock_tracker.get_daily_summary.return_value = mock_summary
from click.testing import CliRunner
runner = CliRunner()
today = date.today().strftime('%Y-%m-%d')
result = runner.invoke(worktime, ['daily', today])
assert result.exit_code == 0
assert f"📅 Daily Summary for {date.today()}" in result.output
assert "Total Time: 2h30m" in result.output
assert "Issues Worked: 2" in result.output
assert "Cost per Minute: €0.5000" in result.output
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_estimate_command(self, mock_tracker_class):
"""Test the estimate worktime command."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_result = {
'work_date': date.today(),
'total_minutes': 480, # 8 hours
'distribution_method': 'equal',
'issue_estimates': {122: 240, 123: 240},
'issues_count': 2
}
mock_tracker.estimate_daily_worktime.return_value = mock_result
from click.testing import CliRunner
runner = CliRunner()
today = date.today().strftime('%Y-%m-%d')
result = runner.invoke(worktime, ['estimate', today, '8', '-i', '122', '-i', '123'])
assert result.exit_code == 0
assert "📊 Worktime Estimation" in result.output
assert "Total Hours: 8.0h" in result.output
assert "✅ Created 2 estimated worktime entries" in result.output
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_distribute_command(self, mock_tracker_class):
"""Test the cost distribution command."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_result = {
'work_date': date.today(),
'total_cost': 100.0,
'total_minutes': 200,
'cost_per_minute': 0.5,
'distributions': {
122: {'minutes': 120, 'percentage': 60.0, 'cost_allocated': 60.0},
123: {'minutes': 80, 'percentage': 40.0, 'cost_allocated': 40.0}
},
'issues_count': 2
}
mock_tracker.distribute_daily_costs.return_value = mock_result
from click.testing import CliRunner
runner = CliRunner()
today = date.today().strftime('%Y-%m-%d')
result = runner.invoke(worktime, ['distribute', today, '100'])
assert result.exit_code == 0
assert "💰 Cost Distribution" in result.output
assert "Total Cost: €100.00" in result.output
assert "Cost per Minute: €0.5000" in result.output
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_delete_command(self, mock_tracker_class):
"""Test the delete command."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_tracker.delete_worktime_entry.return_value = True
from click.testing import CliRunner
runner = CliRunner()
# Auto-confirm the deletion
result = runner.invoke(worktime, ['delete', '1'], input='y\n')
assert result.exit_code == 0
assert "✅ Deleted worktime entry #1" in result.output
mock_tracker.delete_worktime_entry.assert_called_once_with(1)
@patch('markitect.finance.worktime_commands.WorktimeTracker')
def test_update_command(self, mock_tracker_class):
"""Test the update command."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_tracker.update_worktime_entry.return_value = True
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(worktime, ['update', '1', '--duration', '2h', '--description', 'Updated'])
assert result.exit_code == 0
assert "✅ Updated worktime entry #1" in result.output
class TestWorktimeIntegration:
"""Integration tests for the complete worktime tracking system."""
def setup_method(self):
"""Set up integration test fixtures."""
self.temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
self.temp_db.close()
self.db_path = self.temp_db.name
def teardown_method(self):
"""Clean up integration test fixtures."""
Path(self.db_path).unlink(missing_ok=True)
def test_full_worktime_lifecycle(self):
"""Test the complete lifecycle of worktime tracking."""
tracker = WorktimeTracker(self.db_path)
# 1. Log worktime for multiple issues across multiple days
today = date.today()
yesterday = today - timedelta(days=1)
tracker.log_worktime(122, 120, work_date=yesterday, description="Initial development")
tracker.log_worktime(123, 90, work_date=yesterday, description="Code review")
tracker.log_worktime(122, 90, work_date=today, description="Bug fixes")
tracker.log_worktime(124, 60, work_date=today, description="Documentation")
# 2. Verify daily summaries
yesterday_summary = tracker.get_daily_summary(yesterday)
assert yesterday_summary.total_minutes == 210 # 120 + 90
assert yesterday_summary.issue_count == 2
today_summary = tracker.get_daily_summary(today)
assert today_summary.total_minutes == 150 # 90 + 60
assert today_summary.issue_count == 2
# 3. Distribute costs for a day
distribution = tracker.distribute_daily_costs(
work_date=today,
total_daily_cost=Decimal('75.00') # €75 for today's work
)
assert distribution['total_cost'] == 75.0
assert distribution['total_minutes'] == 150
assert distribution['cost_per_minute'] == 0.5
# Issue 122: 90 minutes = €45
# Issue 124: 60 minutes = €30
assert distribution['distributions'][122]['cost_allocated'] == 45.0
assert distribution['distributions'][124]['cost_allocated'] == 30.0
# 4. Generate comprehensive report
report = tracker.get_worktime_report(
start_date=yesterday,
end_date=today
)
assert report['total_entries'] == 4
assert report['total_time']['total_minutes'] == 360 # 210 + 150
assert report['unique_issues'] == 3 # Issues 122, 123, 124
assert report['unique_dates'] == 2
# 5. Test estimation functionality
tomorrow = today + timedelta(days=1)
estimation = tracker.estimate_daily_worktime(
work_date=tomorrow,
total_hours=6.0,
issues=[122, 125, 126],
distribution_method="equal"
)
assert estimation['total_minutes'] == 360
assert len(estimation['issue_estimates']) == 3
# Each issue should get 120 minutes (equal distribution)
for minutes in estimation['issue_estimates'].values():
assert minutes == 120
# 6. Verify estimated entries were created
tomorrow_entries = tracker.get_worktime_entries(work_date=tomorrow)
assert len(tomorrow_entries) == 3
assert all(e.entry_type == "estimated" for e in tomorrow_entries)
def test_cost_distribution_accuracy(self):
"""Test accurate cost distribution calculations."""
tracker = WorktimeTracker(self.db_path)
work_date = date.today()
# Log precise worktime amounts
tracker.log_worktime(122, 100, work_date=work_date) # 100 minutes
tracker.log_worktime(123, 50, work_date=work_date) # 50 minutes
tracker.log_worktime(124, 150, work_date=work_date) # 150 minutes
# Total: 300 minutes
# Distribute exactly €300
distribution = tracker.distribute_daily_costs(
work_date=work_date,
total_daily_cost=Decimal('300.00')
)
# Should be exactly €1 per minute
assert distribution['cost_per_minute'] == 1.0
# Verify exact cost allocation
assert distribution['distributions'][122]['cost_allocated'] == 100.0
assert distribution['distributions'][123]['cost_allocated'] == 50.0
assert distribution['distributions'][124]['cost_allocated'] == 150.0
# Verify percentages sum to 100%
total_percentage = sum(
dist['percentage'] for dist in distribution['distributions'].values()
)
assert abs(total_percentage - 100.0) < 0.01 # Allow for rounding
# Verify cost allocation was logged to database
with tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT issue_id, cost_allocated
FROM worktime_cost_distributions
WHERE work_date = ?
ORDER BY issue_id
''', (work_date.isoformat() if hasattr(work_date, 'isoformat') else work_date,))
results = cursor.fetchall()
assert len(results) == 3
assert results[0] == (122, 100.0)
assert results[1] == (123, 50.0)
assert results[2] == (124, 150.0)
def test_worktime_modification_and_summary_updates(self):
"""Test that modifying worktime entries correctly updates summaries."""
tracker = WorktimeTracker(self.db_path)
work_date = date.today()
# Log initial worktime
entry_id = tracker.log_worktime(122, 60, work_date=work_date)
# Check initial summary
summary = tracker.get_daily_summary(work_date)
assert summary.total_minutes == 60
# Update the entry
tracker.update_worktime_entry(entry_id, duration_minutes=120)
# Check updated summary
summary = tracker.get_daily_summary(work_date)
assert summary.total_minutes == 120
# Delete the entry
tracker.delete_worktime_entry(entry_id)
# Check final summary
summary = tracker.get_daily_summary(work_date)
assert summary is None or summary.total_minutes == 0

View File

@@ -0,0 +1,454 @@
"""
Tests for MarkiTect period management CLI commands.
This module tests the command-line interface for period management including:
- Period creation, listing, and status management
- Period calculation and lifecycle operations
- CLI error handling and validation
"""
import pytest
import tempfile
import os
from datetime import date
from decimal import Decimal
from click.testing import CliRunner
from markitect.finance.cli import cost_commands
from markitect.finance.models import FinanceModels
from markitect.finance.period_manager import PeriodManager
from markitect.finance.cost_manager import CostItemManager, CostItem
class TestPeriodCLICommands:
"""Test suite for period management CLI commands."""
@pytest.fixture
def temp_db(self):
"""Create temporary database for testing."""
fd, path = tempfile.mkstemp(suffix='.db')
os.close(fd)
yield path
os.unlink(path)
@pytest.fixture
def setup_test_data(self, temp_db):
"""Setup test database with sample period data."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
period_manager = PeriodManager(temp_db)
# Create sample period
period_id = period_manager.create_period(
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31),
period_type='monthly'
)
return temp_db, period_id
@pytest.fixture
def runner(self):
"""Create Click test runner."""
return CliRunner()
def test_period_create_success(self, runner, temp_db):
"""Test period creation via CLI."""
# Initialize database first
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'create',
'--start-date', '2025-02-01',
'--end-date', '2025-02-28',
'--database', temp_db
])
assert result.exit_code == 0
assert "✅ Created period #" in result.output
assert "📅 Period: 2025-02-01 to 2025-02-28" in result.output
assert "📊 Type: monthly" in result.output
def test_period_create_with_loss_forward(self, runner, temp_db):
"""Test period creation with loss carried forward."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'create',
'--start-date', '2025-03-01',
'--end-date', '2025-03-31',
'--loss-forward', '15.75',
'--database', temp_db
])
assert result.exit_code == 0
assert "✅ Created period #" in result.output
assert "💸 Loss carried forward: €15.7500" in result.output
def test_period_create_invalid_dates(self, runner, temp_db):
"""Test period creation with invalid date format."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'create',
'--start-date', 'invalid-date',
'--end-date', '2025-02-28',
'--database', temp_db
])
assert result.exit_code == 1
assert "Error: Dates must be in YYYY-MM-DD format" in result.output
def test_period_create_overlapping_fails(self, runner, setup_test_data):
"""Test that creating overlapping periods fails."""
temp_db, existing_period_id = setup_test_data
result = runner.invoke(cost_commands, [
'period', 'create',
'--start-date', '2025-01-15', # Overlaps with existing period
'--end-date', '2025-02-15',
'--database', temp_db
])
assert result.exit_code == 1
assert "Error:" in result.output
assert "overlaps" in result.output.lower()
def test_period_list_all(self, runner, setup_test_data):
"""Test listing all periods."""
temp_db, period_id = setup_test_data
result = runner.invoke(cost_commands, [
'period', 'list',
'--database', temp_db
])
assert result.exit_code == 0
assert "📅 Calculation Periods" in result.output
assert "2025-01-01" in result.output
assert "2025-01-31" in result.output
assert "Total: 1 periods" in result.output
def test_period_list_with_status_filter(self, runner, setup_test_data):
"""Test listing periods with status filter."""
temp_db, period_id = setup_test_data
# Create second period and close it
period_manager = PeriodManager(temp_db)
period_id2 = period_manager.create_period(
period_start=date(2025, 2, 1),
period_end=date(2025, 2, 28)
)
period_manager.close_period(period_id2)
# Filter by open status
result = runner.invoke(cost_commands, [
'period', 'list',
'--status', 'open',
'--database', temp_db
])
assert result.exit_code == 0
assert "2025-01-01" in result.output # First period should be shown
assert "2025-02-01" not in result.output # Second period should be filtered out
# Filter by closed status
result = runner.invoke(cost_commands, [
'period', 'list',
'--status', 'closed',
'--database', temp_db
])
assert result.exit_code == 0
assert "2025-02-01" in result.output # Second period should be shown
assert "2025-01-01" not in result.output # First period should be filtered out
def test_period_list_with_date_filters(self, runner, temp_db):
"""Test listing periods with date range filters."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
period_manager = PeriodManager(temp_db)
# Create periods in different months
jan_period = period_manager.create_period(date(2025, 1, 1), date(2025, 1, 31))
feb_period = period_manager.create_period(date(2025, 2, 1), date(2025, 2, 28))
# Filter by start date
result = runner.invoke(cost_commands, [
'period', 'list',
'--start-from', '2025-02-01',
'--database', temp_db
])
assert result.exit_code == 0
assert "2025-02-01" in result.output
assert "2025-01-01" not in result.output
def test_period_list_empty(self, runner, temp_db):
"""Test listing periods when none exist."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'list',
'--database', temp_db
])
assert result.exit_code == 0
assert "No periods found matching criteria" in result.output
def test_period_show_details(self, runner, setup_test_data):
"""Test showing period details."""
temp_db, period_id = setup_test_data
result = runner.invoke(cost_commands, [
'period', 'show', str(period_id),
'--database', temp_db
])
assert result.exit_code == 0
assert f"📅 Period #{period_id} Details" in result.output
assert "Start Date: 2025-01-01" in result.output
assert "End Date: 2025-01-31" in result.output
assert "Type: monthly" in result.output
assert "Status: open" in result.output
def test_period_show_nonexistent(self, runner, temp_db):
"""Test showing non-existent period."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'show', '999',
'--database', temp_db
])
assert result.exit_code == 1
assert "Period #999 not found" in result.output
def test_period_calculate(self, runner, setup_test_data):
"""Test period cost calculation."""
temp_db, period_id = setup_test_data
# Add some cost items for calculation
cost_manager = CostItemManager(temp_db)
infra_cat = cost_manager.get_category_by_name('Infrastructure')
cost_item = CostItem(
category_id=infra_cat['id'],
name='Test Server',
cost_type='monthly',
amount_eur=Decimal('25.00'),
starting_from_date=date(2025, 1, 1)
)
cost_manager.create_cost_item(cost_item)
result = runner.invoke(cost_commands, [
'period', 'calculate', str(period_id),
'--database', temp_db
])
assert result.exit_code == 0
assert f"📊 Period #{period_id} Cost Calculation" in result.output
assert "Period: 2025-01-01 to 2025-01-31" in result.output
assert "Monthly Recurring: €25.00" in result.output
assert "Total Period Cost: €25.00" in result.output
def test_period_calculate_nonexistent(self, runner, temp_db):
"""Test calculating costs for non-existent period."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'calculate', '999',
'--database', temp_db
])
assert result.exit_code == 1
assert "Error calculating period:" in result.output
def test_period_status_update(self, runner, setup_test_data):
"""Test period status update."""
temp_db, period_id = setup_test_data
result = runner.invoke(cost_commands, [
'period', 'status', str(period_id), 'calculating',
'--database', temp_db
])
assert result.exit_code == 0
assert f"✅ Period #{period_id} status updated to 'calculating'" in result.output
# Verify the status was actually updated
result = runner.invoke(cost_commands, [
'period', 'show', str(period_id),
'--database', temp_db
])
assert "Status: calculating" in result.output
def test_period_status_update_invalid_status(self, runner, setup_test_data):
"""Test period status update with invalid status."""
temp_db, period_id = setup_test_data
result = runner.invoke(cost_commands, [
'period', 'status', str(period_id), 'invalid',
'--database', temp_db
])
assert result.exit_code == 2 # Click validation error
assert "Invalid value" in result.output
def test_period_status_update_nonexistent(self, runner, temp_db):
"""Test status update for non-existent period."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'status', '999', 'calculating',
'--database', temp_db
])
assert result.exit_code == 1
assert "Error:" in result.output
def test_period_close(self, runner, setup_test_data):
"""Test period closure."""
temp_db, period_id = setup_test_data
result = runner.invoke(cost_commands, [
'period', 'close', str(period_id),
'--database', temp_db
])
assert result.exit_code == 0
assert f"✅ Period #{period_id} has been closed" in result.output
assert "💰 Final total cost:" in result.output
# Verify the period is actually closed
result = runner.invoke(cost_commands, [
'period', 'show', str(period_id),
'--database', temp_db
])
assert "Status: closed" in result.output
def test_period_close_nonexistent(self, runner, temp_db):
"""Test closing non-existent period."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'close', '999',
'--database', temp_db
])
assert result.exit_code == 1
assert "Error:" in result.output
def test_period_current_exists(self, runner, setup_test_data):
"""Test finding current period when it exists."""
temp_db, period_id = setup_test_data
result = runner.invoke(cost_commands, [
'period', 'current',
'--date', '2025-01-15',
'--database', temp_db
])
assert result.exit_code == 0
assert "📅 Current Active Period" in result.output
assert f"Period #{period_id}" in result.output
assert "Dates: 2025-01-01 to 2025-01-31" in result.output
def test_period_current_not_found(self, runner, temp_db):
"""Test finding current period when none exists."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'current',
'--date', '2025-03-15',
'--database', temp_db
])
assert result.exit_code == 0
assert "No active period found for 2025-03-15" in result.output
def test_period_current_default_to_today(self, runner, temp_db):
"""Test current period defaults to today."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'current',
'--database', temp_db
])
assert result.exit_code == 0
assert "No active period found for today" in result.output
assert "💡 Create one with:" in result.output
assert "markitect cost period create" in result.output
def test_period_current_invalid_date(self, runner, temp_db):
"""Test current period with invalid date format."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'current',
'--date', 'invalid-date',
'--database', temp_db
])
assert result.exit_code == 1
assert "Error: Date must be in YYYY-MM-DD format" in result.output
def test_period_help_commands(self, runner):
"""Test help output for period commands."""
# Test main period help
result = runner.invoke(cost_commands, ['period', '--help'])
assert result.exit_code == 0
assert "Manage calculation periods and lifecycle" in result.output
# Test create help
result = runner.invoke(cost_commands, ['period', 'create', '--help'])
assert result.exit_code == 0
assert "Create a new calculation period" in result.output
# Test list help
result = runner.invoke(cost_commands, ['period', 'list', '--help'])
assert result.exit_code == 0
assert "List calculation periods with optional filtering" in result.output
def test_period_commands_missing_database(self, runner):
"""Test period commands without database specification."""
# These should use default config path and still work or show appropriate error
result = runner.invoke(cost_commands, [
'period', 'list'
])
# Should succeed with default database configuration
assert result.exit_code == 0
def test_period_create_quarterly_type(self, runner, temp_db):
"""Test creating quarterly period type."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
result = runner.invoke(cost_commands, [
'period', 'create',
'--start-date', '2025-04-01',
'--end-date', '2025-06-30',
'--type', 'quarterly',
'--database', temp_db
])
assert result.exit_code == 0
assert "✅ Created period #" in result.output
assert "📊 Type: quarterly" in result.output

View File

@@ -0,0 +1,489 @@
"""
Tests for MarkiTect Period Management Framework.
This module tests the complete period lifecycle management system including:
- Period creation, status management, and lifecycle transitions
- Period overlap validation and conflict resolution
- Period calculations and cost aggregation
- Period closure validation and audit trails
- Current period detection and auto-creation
"""
import pytest
import tempfile
import os
from datetime import date, datetime, timedelta
from decimal import Decimal
from markitect.finance.period_manager import PeriodManager, PeriodStatus, Period
from markitect.finance.models import FinanceModels
from markitect.finance.cost_manager import CostItemManager, CostItem
class TestPeriodManager:
"""Test suite for period management system."""
@pytest.fixture
def temp_db(self):
"""Create temporary database for testing."""
fd, path = tempfile.mkstemp(suffix='.db')
os.close(fd)
yield path
os.unlink(path)
@pytest.fixture
def period_manager(self, temp_db):
"""Create period manager with initialized database."""
finance_models = FinanceModels(temp_db)
finance_models.initialize_finance_schema()
return PeriodManager(temp_db)
@pytest.fixture
def sample_period_data(self):
"""Sample period data for testing."""
return {
'period_start': date(2025, 1, 1),
'period_end': date(2025, 1, 31),
'period_type': 'monthly'
}
def test_period_status_enum(self):
"""Test period status enumeration."""
assert PeriodStatus.OPEN.value == 'open'
assert PeriodStatus.CALCULATING.value == 'calculating'
assert PeriodStatus.CLOSED.value == 'closed'
def test_period_dataclass(self):
"""Test Period dataclass creation."""
period = Period(
id=1,
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31),
period_type='monthly',
status='open',
total_costs=Decimal('100.50')
)
assert period.id == 1
assert period.period_start == date(2025, 1, 1)
assert period.period_end == date(2025, 1, 31)
assert period.total_costs == Decimal('100.50')
def test_create_period_success(self, period_manager, sample_period_data):
"""Test successful period creation."""
period_id = period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end'],
period_type=sample_period_data['period_type']
)
assert period_id is not None
assert isinstance(period_id, int)
# Verify period was created
created_period = period_manager.get_period_by_id(period_id)
assert created_period is not None
assert created_period['period_start'] == sample_period_data['period_start'].isoformat()
assert created_period['period_end'] == sample_period_data['period_end'].isoformat()
assert created_period['status'] == PeriodStatus.OPEN.value
def test_create_period_invalid_dates(self, period_manager):
"""Test period creation with invalid date range."""
with pytest.raises(ValueError, match="Period end date must be after start date"):
period_manager.create_period(
period_start=date(2025, 1, 31),
period_end=date(2025, 1, 1) # End before start
)
def test_create_period_with_loss_carried_forward(self, period_manager, sample_period_data):
"""Test period creation with loss carried forward."""
loss_amount = Decimal('25.50')
period_id = period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end'],
loss_carried_forward=loss_amount
)
created_period = period_manager.get_period_by_id(period_id)
assert created_period['loss_carried_forward'] == loss_amount
def test_find_overlapping_periods(self, period_manager, sample_period_data):
"""Test overlap detection functionality."""
# Create first period
period_id1 = period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end']
)
# Test overlapping period detection
overlapping = period_manager.find_overlapping_periods(
period_start=date(2025, 1, 15), # Overlaps with existing
period_end=date(2025, 2, 15)
)
assert len(overlapping) == 1
assert overlapping[0]['id'] == period_id1
def test_create_overlapping_period_fails(self, period_manager, sample_period_data):
"""Test that creating overlapping periods fails."""
# Create first period
period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end']
)
# Try to create overlapping period
with pytest.raises(ValueError, match="Period overlaps with existing periods"):
period_manager.create_period(
period_start=date(2025, 1, 15), # Overlaps
period_end=date(2025, 2, 15)
)
def test_update_period_status_valid_transition(self, period_manager, sample_period_data):
"""Test valid period status transitions."""
period_id = period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end']
)
# Transition from OPEN to CALCULATING
success = period_manager.update_period_status(period_id, PeriodStatus.CALCULATING.value)
assert success is True
updated_period = period_manager.get_period_by_id(period_id)
assert updated_period['status'] == PeriodStatus.CALCULATING.value
# Transition from CALCULATING to CLOSED
success = period_manager.update_period_status(period_id, PeriodStatus.CLOSED.value)
assert success is True
updated_period = period_manager.get_period_by_id(period_id)
assert updated_period['status'] == PeriodStatus.CLOSED.value
def test_update_period_status_invalid_status(self, period_manager, sample_period_data):
"""Test update with invalid status."""
period_id = period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end']
)
with pytest.raises(ValueError, match="Invalid status 'invalid'"):
period_manager.update_period_status(period_id, 'invalid')
def test_update_period_status_nonexistent_period(self, period_manager):
"""Test update status for non-existent period."""
with pytest.raises(ValueError, match="Period #999 not found"):
period_manager.update_period_status(999, PeriodStatus.CALCULATING.value)
def test_calculate_period_costs(self, period_manager, sample_period_data, temp_db):
"""Test period cost calculation functionality."""
# Create period
period_id = period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end']
)
# Set up cost manager and add test data
finance_models = FinanceModels(temp_db)
cost_manager = CostItemManager(temp_db)
# Get categories
infra_cat = cost_manager.get_category_by_name('Infrastructure')
software_cat = cost_manager.get_category_by_name('Software')
# Create test cost items
monthly_item = CostItem(
category_id=infra_cat['id'],
name='Monthly Server',
cost_type='monthly',
amount_eur=Decimal('25.00'),
starting_from_date=date(2024, 12, 1) # Started before period
)
one_time_item = CostItem(
category_id=software_cat['id'],
name='One-time License',
cost_type='one_time',
amount_eur=Decimal('50.00'),
starting_from_date=date(2025, 1, 15) # Within period
)
cost_manager.create_cost_item(monthly_item)
cost_manager.create_cost_item(one_time_item)
# Calculate period costs
calculation_result = period_manager.calculate_period_costs(period_id)
# Verify calculation results
assert calculation_result['period_id'] == period_id
assert calculation_result['monthly_costs'] == 25.0
assert calculation_result['one_time_costs'] == 50.0
assert calculation_result['total_costs'] == 75.0
# Verify period was updated
updated_period = period_manager.get_period_by_id(period_id)
assert updated_period['total_costs'] == Decimal('75.00')
def test_close_period(self, period_manager, sample_period_data):
"""Test period closure functionality."""
period_id = period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end']
)
# Close the period
success = period_manager.close_period(period_id)
assert success is True
# Verify period is closed
closed_period = period_manager.get_period_by_id(period_id)
assert closed_period['status'] == PeriodStatus.CLOSED.value
def test_close_period_already_closed(self, period_manager, sample_period_data):
"""Test closing an already closed period."""
period_id = period_manager.create_period(
period_start=sample_period_data['period_start'],
period_end=sample_period_data['period_end']
)
# Close period first time
period_manager.close_period(period_id)
# Close again (should succeed without error)
success = period_manager.close_period(period_id)
assert success is True
def test_close_nonexistent_period(self, period_manager):
"""Test closing non-existent period."""
with pytest.raises(ValueError, match="Period #999 not found"):
period_manager.close_period(999)
def test_list_periods_no_filter(self, period_manager, sample_period_data):
"""Test listing all periods without filters."""
# Create multiple periods
period_id1 = period_manager.create_period(
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
period_id2 = period_manager.create_period(
period_start=date(2025, 2, 1),
period_end=date(2025, 2, 28)
)
# List all periods
periods = period_manager.list_periods()
assert len(periods) == 2
period_ids = [p['id'] for p in periods]
assert period_id1 in period_ids
assert period_id2 in period_ids
def test_list_periods_with_status_filter(self, period_manager):
"""Test listing periods with status filter."""
# Create periods with different statuses
period_id1 = period_manager.create_period(
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
period_id2 = period_manager.create_period(
period_start=date(2025, 2, 1),
period_end=date(2025, 2, 28)
)
# Close one period
period_manager.close_period(period_id2)
# Filter by open status
open_periods = period_manager.list_periods(status_filter=PeriodStatus.OPEN.value)
assert len(open_periods) == 1
assert open_periods[0]['id'] == period_id1
# Filter by closed status
closed_periods = period_manager.list_periods(status_filter=PeriodStatus.CLOSED.value)
assert len(closed_periods) == 1
assert closed_periods[0]['id'] == period_id2
def test_list_periods_with_date_filters(self, period_manager):
"""Test listing periods with date range filters."""
# Create periods in different months
jan_period = period_manager.create_period(
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
feb_period = period_manager.create_period(
period_start=date(2025, 2, 1),
period_end=date(2025, 2, 28)
)
# Filter by start date
periods_from_feb = period_manager.list_periods(start_date=date(2025, 2, 1))
assert len(periods_from_feb) == 1
assert periods_from_feb[0]['id'] == feb_period
# Filter by end date
periods_until_jan = period_manager.list_periods(end_date=date(2025, 1, 31))
assert len(periods_until_jan) == 1
assert periods_until_jan[0]['id'] == jan_period
def test_get_current_period(self, period_manager):
"""Test getting current period for a specific date."""
# Create period covering January 2025
period_id = period_manager.create_period(
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31)
)
# Test date within period
current = period_manager.get_current_period(date(2025, 1, 15))
assert current is not None
assert current['id'] == period_id
# Test date outside period
current = period_manager.get_current_period(date(2025, 2, 15))
assert current is None
def test_get_current_period_defaults_to_today(self, period_manager):
"""Test that get_current_period defaults to today's date."""
today = date.today()
# Create period covering today
period_id = period_manager.create_period(
period_start=date(today.year, today.month, 1),
period_end=date(today.year, today.month, 31) if today.month != 12
else date(today.year, 12, 31)
)
# Get current period without specifying date
current = period_manager.get_current_period()
assert current is not None
assert current['id'] == period_id
def test_create_monthly_period(self, period_manager):
"""Test convenience method for creating monthly periods."""
period_id = period_manager.create_monthly_period(2025, 3)
assert period_id is not None
# Verify correct dates were set
period = period_manager.get_period_by_id(period_id)
assert period['period_start'] == '2025-03-01'
assert period['period_end'] == '2025-03-31'
assert period['period_type'] == 'monthly'
def test_create_monthly_period_december(self, period_manager):
"""Test creating monthly period for December (year boundary)."""
period_id = period_manager.create_monthly_period(2025, 12)
period = period_manager.get_period_by_id(period_id)
assert period['period_start'] == '2025-12-01'
assert period['period_end'] == '2025-12-31'
def test_auto_create_period_for_date(self, period_manager):
"""Test automatic period creation for a given date."""
test_date = date(2025, 5, 15)
# First call should create new period
period_id = period_manager.auto_create_period_for_date(test_date)
assert period_id is not None
# Second call should return existing period
period_id2 = period_manager.auto_create_period_for_date(test_date)
assert period_id2 == period_id
# Verify period covers the test date
period = period_manager.get_period_by_id(period_id)
assert period['period_start'] == '2025-05-01'
assert period['period_end'] == '2025-05-31'
def test_period_calculation_with_loss_carried_forward(self, period_manager, temp_db):
"""Test period calculation including loss carried forward."""
# Create period with loss carried forward
period_id = period_manager.create_period(
period_start=date(2025, 1, 1),
period_end=date(2025, 1, 31),
loss_carried_forward=Decimal('15.75')
)
# Add a cost item
cost_manager = CostItemManager(temp_db)
infra_cat = cost_manager.get_category_by_name('Infrastructure')
cost_item = CostItem(
category_id=infra_cat['id'],
name='Test Server',
cost_type='monthly',
amount_eur=Decimal('10.00'),
starting_from_date=date(2025, 1, 1)
)
cost_manager.create_cost_item(cost_item)
# Calculate costs
calculation = period_manager.calculate_period_costs(period_id)
# Should include loss carried forward
assert calculation['loss_carried_forward'] == 15.75
assert calculation['monthly_costs'] == 10.0
assert calculation['total_costs'] == 25.75 # 10.0 + 15.75
def test_period_cost_calculation_edge_cases(self, period_manager, temp_db):
"""Test period cost calculation with various edge cases."""
# Create period
period_id = period_manager.create_period(
period_start=date(2025, 3, 1),
period_end=date(2025, 3, 31)
)
cost_manager = CostItemManager(temp_db)
infra_cat = cost_manager.get_category_by_name('Infrastructure')
# Item that starts before period and ends during period
item1 = CostItem(
category_id=infra_cat['id'],
name='Ending Item',
cost_type='monthly',
amount_eur=Decimal('20.00'),
starting_from_date=date(2025, 1, 1),
ending_date=date(2025, 3, 15)
)
# Item that starts after period
item2 = CostItem(
category_id=infra_cat['id'],
name='Future Item',
cost_type='monthly',
amount_eur=Decimal('30.00'),
starting_from_date=date(2025, 4, 1)
)
# One-time item outside period
item3 = CostItem(
category_id=infra_cat['id'],
name='Past One-time',
cost_type='one_time',
amount_eur=Decimal('100.00'),
starting_from_date=date(2025, 2, 15)
)
cost_manager.create_cost_item(item1)
cost_manager.create_cost_item(item2)
cost_manager.create_cost_item(item3)
# Calculate costs
calculation = period_manager.calculate_period_costs(period_id)
# Only item1 should be included (ends during period)
assert calculation['monthly_costs'] == 20.0
assert calculation['one_time_costs'] == 0.0
assert calculation['total_costs'] == 20.0
def test_error_handling_database_errors(self, period_manager):
"""Test error handling for database-related issues."""
# Test with invalid period ID
with pytest.raises(ValueError, match="Period #-1 not found"):
period_manager.calculate_period_costs(-1)
# Test getting non-existent period
result = period_manager.get_period_by_id(99999)
assert result is None