feat: implement issue activity tracking system (issue #113)

- Add comprehensive IssueActivityTracker service with ActivityType enum and IssueActivity dataclass
- Implement full CLI interface with log, show, list, summary, delete, and import-activities commands
- Support activity logging with automatic period detection and cost allocation integration
- Add activity retrieval by issue, by period, with filtering and pagination
- Include activity summaries with statistics and breakdowns across issues and time periods
- Support bulk operations for activity import from JSON/CSV formats
- Integrate with existing finance schema using cost_periods and issue_activity_log tables
- Add 28 comprehensive test cases covering all functionality with 100% pass rate
- Enable both table and JSON output formats for all CLI commands

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-10-04 03:14:04 +02:00
parent 55147e2bce
commit d49fa8e9fb
4 changed files with 1288 additions and 0 deletions

View File

@@ -6378,6 +6378,10 @@ cli.add_command(wishlist_group)
# Register issue management commands
cli.add_command(issues_group)
# Register issue activity tracking commands
from markitect.issues.activity_commands import activity as activity_group
cli.add_command(activity_group)
# Query Paradigm Commands - Issue #62
@click.group()

View File

@@ -0,0 +1,293 @@
"""
CLI commands for issue activity tracking.
This module provides command-line interface for logging, viewing, and managing
issue activities for cost allocation and project management purposes.
"""
import click
from datetime import datetime, date
from typing import List, Optional
from tabulate import tabulate
from markitect.issues.activity_tracker import IssueActivityTracker, ActivityType, IssueActivity
@click.group()
def activity():
"""Issue activity tracking commands."""
pass
@activity.command()
@click.argument('issue_id', type=int)
@click.argument('activity_type', type=click.Choice([at.value for at in ActivityType]))
@click.option('--date', '-d', type=click.DateTime(formats=['%Y-%m-%d']),
help='Activity date (defaults to today)')
@click.option('--details', '-m', help='Activity details/message')
@click.option('--period-id', type=int, help='Cost period ID for allocation')
def log(issue_id: int, activity_type: str, date: Optional[datetime],
details: Optional[str], period_id: Optional[int]):
"""Log an activity for an issue."""
tracker = IssueActivityTracker()
activity_date = date.date() if date else None
activity_enum = ActivityType(activity_type)
try:
activity_id = tracker.log_activity(
issue_id=issue_id,
activity_type=activity_enum,
activity_date=activity_date,
activity_details=details,
period_id=period_id
)
click.echo(f"✅ Logged {activity_type} activity for issue #{issue_id} (ID: {activity_id})")
if details:
click.echo(f" Details: {details}")
except Exception as e:
click.echo(f"❌ Error logging activity: {e}", err=True)
raise click.Abort()
@activity.command()
@click.argument('issue_id', type=int)
@click.option('--limit', '-l', type=int, default=20, help='Maximum number of activities to show')
@click.option('--offset', type=int, default=0, help='Number of activities to skip')
@click.option('--format', 'output_format', type=click.Choice(['table', 'json']),
default='table', help='Output format')
def show(issue_id: int, limit: int, offset: int, output_format: str):
"""Show activities for a specific issue."""
tracker = IssueActivityTracker()
try:
activities = tracker.get_issue_activities(issue_id, limit=limit, offset=offset)
if not activities:
click.echo(f"📝 No activities found for issue #{issue_id}")
return
if output_format == 'json':
import json
activity_data = []
for activity in activities:
data = {
'id': activity.id,
'issue_id': activity.issue_id,
'activity_type': activity.activity_type.value,
'activity_date': activity.activity_date.isoformat() if activity.activity_date else None,
'period_id': activity.period_id,
'activity_details': activity.activity_details,
'created_at': activity.created_at.isoformat() if activity.created_at else None
}
activity_data.append(data)
click.echo(json.dumps(activity_data, indent=2))
else:
# Table format
click.echo(f"\n📋 Activities for Issue #{issue_id}\n")
headers = ['ID', 'Type', 'Date', 'Period', 'Details', 'Logged']
rows = []
for activity in activities:
rows.append([
activity.id,
activity.activity_type.value.title(),
activity.activity_date.strftime('%Y-%m-%d') if activity.activity_date else 'N/A',
activity.period_id or 'N/A',
(activity.activity_details[:40] + '...') if activity.activity_details and len(activity.activity_details) > 40 else (activity.activity_details or ''),
activity.created_at.strftime('%Y-%m-%d %H:%M') if activity.created_at else 'N/A'
])
click.echo(tabulate(rows, headers=headers, tablefmt='grid'))
if len(activities) == limit:
click.echo(f"\n💡 Showing {limit} most recent activities. Use --limit and --offset for pagination.")
except Exception as e:
click.echo(f"❌ Error retrieving activities: {e}", err=True)
raise click.Abort()
@activity.command()
@click.option('--period-id', type=int, help='Filter by cost period ID')
@click.option('--activity-type', type=click.Choice([at.value for at in ActivityType]),
multiple=True, help='Filter by activity types (can specify multiple)')
@click.option('--format', 'output_format', type=click.Choice(['table', 'json']),
default='table', help='Output format')
def list(period_id: Optional[int], activity_type: List[str], output_format: str):
"""List activities across issues."""
tracker = IssueActivityTracker()
try:
if period_id:
activity_types = [ActivityType(at) for at in activity_type] if activity_type else None
activities = tracker.get_activities_by_period(period_id, activity_types)
title = f"Activities for Period #{period_id}"
else:
# For now, show recent activities across all issues (could be enhanced)
click.echo("❌ Currently only period-based listing is supported. Use --period-id option.")
return
if not activities:
click.echo(f"📝 No activities found for the specified criteria")
return
if output_format == 'json':
import json
activity_data = []
for activity in activities:
data = {
'id': activity.id,
'issue_id': activity.issue_id,
'activity_type': activity.activity_type.value,
'activity_date': activity.activity_date.isoformat() if activity.activity_date else None,
'period_id': activity.period_id,
'activity_details': activity.activity_details,
'created_at': activity.created_at.isoformat() if activity.created_at else None
}
activity_data.append(data)
click.echo(json.dumps(activity_data, indent=2))
else:
# Table format
click.echo(f"\n📊 {title}\n")
headers = ['ID', 'Issue', 'Type', 'Date', 'Details']
rows = []
for activity in activities:
rows.append([
activity.id,
f"#{activity.issue_id}",
activity.activity_type.value.title(),
activity.activity_date.strftime('%Y-%m-%d') if activity.activity_date else 'N/A',
(activity.activity_details[:50] + '...') if activity.activity_details and len(activity.activity_details) > 50 else (activity.activity_details or '')
])
click.echo(tabulate(rows, headers=headers, tablefmt='grid'))
click.echo(f"\n📈 Total: {len(activities)} activities")
except Exception as e:
click.echo(f"❌ Error retrieving activities: {e}", err=True)
raise click.Abort()
@activity.command()
@click.option('--issue-id', type=int, help='Filter by specific issue ID')
@click.option('--start-date', type=click.DateTime(formats=['%Y-%m-%d']),
help='Start date for summary period')
@click.option('--end-date', type=click.DateTime(formats=['%Y-%m-%d']),
help='End date for summary period')
def summary(issue_id: Optional[int], start_date: Optional[datetime],
end_date: Optional[datetime]):
"""Show activity summary statistics."""
tracker = IssueActivityTracker()
try:
summary_data = tracker.get_activity_summary(
issue_id=issue_id,
start_date=start_date.date() if start_date else None,
end_date=end_date.date() if end_date else None
)
click.echo("\n📊 Issue Activity Summary\n")
# Basic stats
click.echo(f"Total Activities: {summary_data['total_activities']}")
click.echo(f"Unique Issues: {summary_data['unique_issues']}")
# Date range
date_range = summary_data['date_range']
if date_range['start'] and date_range['end']:
click.echo(f"Date Range: {date_range['start']} to {date_range['end']}")
elif date_range['start']:
click.echo(f"Since: {date_range['start']}")
# Activity breakdown
if summary_data['activities_by_type']:
click.echo("\nActivity Breakdown:")
for activity_type, count in summary_data['activities_by_type'].items():
percentage = (count / summary_data['total_activities']) * 100
click.echo(f" {activity_type.title()}: {count} ({percentage:.1f}%)")
# Filters applied
filters = summary_data['filters']
applied_filters = []
if filters['issue_id']:
applied_filters.append(f"Issue #{filters['issue_id']}")
if filters['start_date']:
applied_filters.append(f"From {filters['start_date']}")
if filters['end_date']:
applied_filters.append(f"Until {filters['end_date']}")
if applied_filters:
click.echo(f"\nFilters Applied: {', '.join(applied_filters)}")
except Exception as e:
click.echo(f"❌ Error generating summary: {e}", err=True)
raise click.Abort()
@activity.command()
@click.argument('activity_id', type=int)
@click.confirmation_option(prompt='Are you sure you want to delete this activity?')
def delete(activity_id: int):
"""Delete an activity record."""
tracker = IssueActivityTracker()
try:
if tracker.delete_activity(activity_id):
click.echo(f"✅ Deleted activity #{activity_id}")
else:
click.echo(f"❌ Activity #{activity_id} not found")
raise click.Abort()
except Exception as e:
click.echo(f"❌ Error deleting activity: {e}", err=True)
raise click.Abort()
@activity.command()
@click.argument('file_path', type=click.Path(exists=True))
@click.option('--format', 'input_format', type=click.Choice(['json', 'csv']),
default='json', help='Input file format')
def import_activities(file_path: str, input_format: str):
"""Import activities from a file."""
tracker = IssueActivityTracker()
try:
if input_format == 'json':
import json
with open(file_path, 'r') as f:
activities = json.load(f)
elif input_format == 'csv':
import csv
activities = []
with open(file_path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
activity = {
'issue_id': int(row['issue_id']),
'activity_type': row['activity_type'],
'activity_date': datetime.strptime(row['activity_date'], '%Y-%m-%d').date() if row.get('activity_date') else None,
'activity_details': row.get('activity_details'),
'period_id': int(row['period_id']) if row.get('period_id') else None
}
activities.append(activity)
activity_ids = tracker.bulk_log_activities(activities)
click.echo(f"✅ Successfully imported {len(activity_ids)} activities")
except Exception as e:
click.echo(f"❌ Error importing activities: {e}", err=True)
raise click.Abort()
if __name__ == '__main__':
activity()

View File

@@ -0,0 +1,364 @@
"""
Issue Activity Tracking Service
This module provides comprehensive issue activity tracking functionality,
building on the existing database infrastructure to log and retrieve
issue activities for cost allocation and project management.
"""
import sqlite3
from datetime import datetime, date
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
from markitect.finance.models import FinanceModels
class ActivityType(Enum):
"""Enumeration of supported issue activity types."""
CREATED = "created"
MODIFIED = "modified"
CLOSED = "closed"
REOPENED = "reopened"
COMMENTED = "commented"
STATUS_CHANGED = "status_changed"
@dataclass
class IssueActivity:
"""Data class representing an issue activity record."""
id: Optional[int] = None
issue_id: int = None
activity_type: ActivityType = None
activity_date: date = None
period_id: Optional[int] = None
activity_details: Optional[str] = None
created_at: Optional[datetime] = None
class IssueActivityTracker:
"""
Service for tracking and managing issue activities.
Provides functionality to log issue activities, retrieve activity history,
and generate activity reports for cost allocation and project management.
"""
def __init__(self, db_path: str = "markitect.db"):
"""
Initialize the issue activity tracker.
Args:
db_path: Path to the SQLite database file
"""
self.db_path = db_path
self.finance_models = FinanceModels(db_path)
self._ensure_schema()
def _ensure_schema(self):
"""Ensure the database schema is properly initialized."""
self.finance_models.initialize_finance_schema()
def log_activity(
self,
issue_id: int,
activity_type: ActivityType,
activity_date: Optional[date] = None,
activity_details: Optional[str] = None,
period_id: Optional[int] = None
) -> int:
"""
Log an issue activity.
Args:
issue_id: ID of the issue
activity_type: Type of activity performed
activity_date: Date when activity occurred (defaults to today)
activity_details: Additional details about the activity
period_id: Optional period ID for cost allocation
Returns:
ID of the created activity record
Raises:
sqlite3.Error: If database operation fails
"""
if activity_date is None:
activity_date = date.today()
with self.finance_models.get_connection() as conn:
cursor = conn.cursor()
# If period_id is not provided, try to get the current period
if period_id is None:
cursor.execute('''
SELECT id FROM cost_periods
WHERE ? BETWEEN period_start AND period_end
ORDER BY created_at DESC LIMIT 1
''', (activity_date,))
result = cursor.fetchone()
if result:
period_id = result[0]
cursor.execute('''
INSERT INTO issue_activity_log
(issue_id, activity_type, activity_date, period_id, activity_details)
VALUES (?, ?, ?, ?, ?)
''', (issue_id, activity_type.value, activity_date, period_id, activity_details))
return cursor.lastrowid
def get_issue_activities(
self,
issue_id: int,
limit: Optional[int] = None,
offset: int = 0
) -> List[IssueActivity]:
"""
Get activities for a specific issue.
Args:
issue_id: ID of the issue
limit: Maximum number of activities to return
offset: Number of activities to skip
Returns:
List of issue activities, ordered by activity date (most recent first)
"""
with self.finance_models.get_connection() as conn:
cursor = conn.cursor()
query = '''
SELECT id, issue_id, activity_type, activity_date,
period_id, activity_details, created_at
FROM issue_activity_log
WHERE issue_id = ?
ORDER BY activity_date DESC, created_at DESC
'''
params = [issue_id]
if limit:
query += ' LIMIT ? OFFSET ?'
params.extend([limit, offset])
cursor.execute(query, params)
rows = cursor.fetchall()
activities = []
for row in rows:
activity = IssueActivity(
id=row[0],
issue_id=row[1],
activity_type=ActivityType(row[2]),
activity_date=datetime.strptime(row[3], '%Y-%m-%d').date() if row[3] else None,
period_id=row[4],
activity_details=row[5],
created_at=datetime.fromisoformat(row[6]) if row[6] else None
)
activities.append(activity)
return activities
def get_activities_by_period(
self,
period_id: int,
activity_types: Optional[List[ActivityType]] = None
) -> List[IssueActivity]:
"""
Get all activities within a specific cost period.
Args:
period_id: ID of the cost period
activity_types: Optional list of activity types to filter by
Returns:
List of issue activities within the period
"""
with self.finance_models.get_connection() as conn:
cursor = conn.cursor()
query = '''
SELECT id, issue_id, activity_type, activity_date,
period_id, activity_details, created_at
FROM issue_activity_log
WHERE period_id = ?
'''
params = [period_id]
if activity_types:
placeholders = ','.join(['?' for _ in activity_types])
query += f' AND activity_type IN ({placeholders})'
params.extend([at.value for at in activity_types])
query += ' ORDER BY activity_date DESC, created_at DESC'
cursor.execute(query, params)
rows = cursor.fetchall()
activities = []
for row in rows:
activity = IssueActivity(
id=row[0],
issue_id=row[1],
activity_type=ActivityType(row[2]),
activity_date=datetime.strptime(row[3], '%Y-%m-%d').date() if row[3] else None,
period_id=row[4],
activity_details=row[5],
created_at=datetime.fromisoformat(row[6]) if row[6] else None
)
activities.append(activity)
return activities
def get_activity_summary(
self,
issue_id: Optional[int] = None,
start_date: Optional[date] = None,
end_date: Optional[date] = None
) -> Dict[str, Any]:
"""
Get activity summary statistics.
Args:
issue_id: Optional issue ID to filter by
start_date: Optional start date filter
end_date: Optional end date filter
Returns:
Dictionary containing activity summary statistics
"""
with self.finance_models.get_connection() as conn:
cursor = conn.cursor()
# Build base query
base_conditions = []
params = []
if issue_id:
base_conditions.append('issue_id = ?')
params.append(issue_id)
if start_date:
base_conditions.append('activity_date >= ?')
params.append(start_date)
if end_date:
base_conditions.append('activity_date <= ?')
params.append(end_date)
where_clause = ' AND '.join(base_conditions) if base_conditions else '1=1'
# Get total activity count
cursor.execute(f'''
SELECT COUNT(*) FROM issue_activity_log WHERE {where_clause}
''', params)
total_activities = cursor.fetchone()[0]
# Get activity count by type
cursor.execute(f'''
SELECT activity_type, COUNT(*)
FROM issue_activity_log
WHERE {where_clause}
GROUP BY activity_type
ORDER BY COUNT(*) DESC
''', params)
activities_by_type = dict(cursor.fetchall())
# Get unique issues count
cursor.execute(f'''
SELECT COUNT(DISTINCT issue_id)
FROM issue_activity_log
WHERE {where_clause}
''', params)
unique_issues = cursor.fetchone()[0]
# Get date range
cursor.execute(f'''
SELECT MIN(activity_date), MAX(activity_date)
FROM issue_activity_log
WHERE {where_clause}
''', params)
date_range = cursor.fetchone()
return {
'total_activities': total_activities,
'activities_by_type': activities_by_type,
'unique_issues': unique_issues,
'date_range': {
'start': date_range[0] if date_range[0] else None,
'end': date_range[1] if date_range[1] else None
},
'filters': {
'issue_id': issue_id,
'start_date': start_date.isoformat() if start_date else None,
'end_date': end_date.isoformat() if end_date else None
}
}
def delete_activity(self, activity_id: int) -> bool:
"""
Delete an activity record.
Args:
activity_id: ID of the activity to delete
Returns:
True if activity was deleted, False if not found
"""
with self.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM issue_activity_log WHERE id = ?', (activity_id,))
return cursor.rowcount > 0
def bulk_log_activities(self, activities: List[Dict[str, Any]]) -> List[int]:
"""
Log multiple activities in a single transaction.
Args:
activities: List of activity dictionaries with keys:
issue_id, activity_type, activity_date (optional),
activity_details (optional), period_id (optional)
Returns:
List of created activity IDs
Raises:
ValueError: If activity data is invalid
sqlite3.Error: If database operation fails
"""
activity_ids = []
with self.finance_models.get_connection() as conn:
cursor = conn.cursor()
for activity_data in activities:
if 'issue_id' not in activity_data or 'activity_type' not in activity_data:
raise ValueError("Each activity must have 'issue_id' and 'activity_type'")
issue_id = activity_data['issue_id']
activity_type = ActivityType(activity_data['activity_type'])
activity_date = activity_data.get('activity_date', date.today())
activity_details = activity_data.get('activity_details')
period_id = activity_data.get('period_id')
# Auto-determine period if not provided
if period_id is None:
cursor.execute('''
SELECT id FROM cost_periods
WHERE ? BETWEEN period_start AND period_end
ORDER BY created_at DESC LIMIT 1
''', (activity_date,))
result = cursor.fetchone()
if result:
period_id = result[0]
cursor.execute('''
INSERT INTO issue_activity_log
(issue_id, activity_type, activity_date, period_id, activity_details)
VALUES (?, ?, ?, ?, ?)
''', (issue_id, activity_type.value, activity_date, period_id, activity_details))
activity_ids.append(cursor.lastrowid)
return activity_ids

View File

@@ -0,0 +1,627 @@
"""
Tests for Issue #113 - Issue Activity Tracking Implementation
This module contains comprehensive tests for the issue activity tracking
service and CLI commands that log, retrieve, and manage issue activities
for cost allocation and project management.
"""
import pytest
import sqlite3
from datetime import datetime, date
from unittest.mock import Mock, patch, MagicMock
from pathlib import Path
import tempfile
import json
import csv
import io
from contextlib import redirect_stdout
from markitect.issues.activity_tracker import IssueActivityTracker, ActivityType, IssueActivity
from markitect.issues.activity_commands import activity
class TestActivityType:
"""Test suite for ActivityType enumeration."""
def test_activity_type_values(self):
"""Test that all expected activity types are available."""
expected_types = {
"created", "modified", "closed", "reopened", "commented", "status_changed"
}
actual_types = {at.value for at in ActivityType}
assert actual_types == expected_types
def test_activity_type_enumeration(self):
"""Test that ActivityType can be constructed from string values."""
assert ActivityType("created") == ActivityType.CREATED
assert ActivityType("modified") == ActivityType.MODIFIED
assert ActivityType("closed") == ActivityType.CLOSED
class TestIssueActivity:
"""Test suite for IssueActivity dataclass."""
def test_issue_activity_creation(self):
"""Test that IssueActivity objects can be created properly."""
activity = IssueActivity(
id=1,
issue_id=59,
activity_type=ActivityType.CREATED,
activity_date=date.today(),
activity_details="Issue created"
)
assert activity.id == 1
assert activity.issue_id == 59
assert activity.activity_type == ActivityType.CREATED
assert activity.activity_date == date.today()
assert activity.activity_details == "Issue created"
def test_issue_activity_defaults(self):
"""Test that IssueActivity has proper default values."""
activity = IssueActivity()
assert activity.id is None
assert activity.issue_id is None
assert activity.activity_type is None
assert activity.activity_date is None
assert activity.period_id is None
assert activity.activity_details is None
assert activity.created_at is None
class TestIssueActivityTracker:
"""Test suite for IssueActivityTracker service."""
def setup_method(self):
"""Set up test fixtures with temporary database."""
self.temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
self.temp_db.close()
self.db_path = self.temp_db.name
self.tracker = IssueActivityTracker(self.db_path)
def teardown_method(self):
"""Clean up test fixtures."""
Path(self.db_path).unlink(missing_ok=True)
def test_tracker_initialization(self):
"""Test that tracker initializes properly with database."""
assert self.tracker.db_path == self.db_path
assert self.tracker.finance_models is not None
# Verify database schema was created
with self.tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='issue_activity_log'")
assert cursor.fetchone() is not None
def test_log_activity_basic(self):
"""Test logging a basic activity."""
activity_id = self.tracker.log_activity(
issue_id=59,
activity_type=ActivityType.CREATED,
activity_details="Test issue created"
)
assert activity_id is not None
# Verify activity was stored
with self.tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM issue_activity_log WHERE id = ?", (activity_id,))
row = cursor.fetchone()
assert row is not None
assert row[1] == 59 # issue_id
assert row[2] == "created" # activity_type
assert row[5] == "Test issue created" # activity_details
def test_log_activity_with_custom_date(self):
"""Test logging activity with custom date."""
custom_date = date(2025, 10, 1)
activity_id = self.tracker.log_activity(
issue_id=59,
activity_type=ActivityType.MODIFIED,
activity_date=custom_date
)
with self.tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT activity_date FROM issue_activity_log WHERE id = ?", (activity_id,))
stored_date = cursor.fetchone()[0]
assert stored_date == "2025-10-01"
def test_log_activity_with_period_id(self):
"""Test logging activity with specific period ID."""
# First create a cost period
with self.tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO cost_periods (period_start, period_end, total_costs)
VALUES ('2025-10-01', '2025-10-31', 1000.00)
""")
period_id = cursor.lastrowid
activity_id = self.tracker.log_activity(
issue_id=59,
activity_type=ActivityType.CREATED,
period_id=period_id
)
with self.tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT period_id FROM issue_activity_log WHERE id = ?", (activity_id,))
stored_period_id = cursor.fetchone()[0]
assert stored_period_id == period_id
def test_get_issue_activities(self):
"""Test retrieving activities for a specific issue."""
# Log multiple activities
self.tracker.log_activity(59, ActivityType.CREATED, activity_details="Created")
self.tracker.log_activity(59, ActivityType.MODIFIED, activity_details="Modified")
self.tracker.log_activity(60, ActivityType.CREATED, activity_details="Different issue")
activities = self.tracker.get_issue_activities(59)
assert len(activities) == 2
assert all(a.issue_id == 59 for a in activities)
assert activities[0].activity_type in [ActivityType.CREATED, ActivityType.MODIFIED]
def test_get_issue_activities_with_limit(self):
"""Test retrieving activities with limit and offset."""
# Log multiple activities
for i in range(5):
self.tracker.log_activity(59, ActivityType.MODIFIED, activity_details=f"Update {i}")
activities = self.tracker.get_issue_activities(59, limit=2, offset=1)
assert len(activities) == 2
def test_get_activities_by_period(self):
"""Test retrieving activities by cost period."""
# Create a cost period
with self.tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO cost_periods (period_start, period_end, total_costs)
VALUES ('2025-10-01', '2025-10-31', 1000.00)
""")
period_id = cursor.lastrowid
# Log activities in different periods
self.tracker.log_activity(59, ActivityType.CREATED, period_id=period_id)
self.tracker.log_activity(60, ActivityType.MODIFIED, period_id=period_id)
# Log activity outside the period date range
self.tracker.log_activity(61, ActivityType.CLOSED, activity_date=date(2025, 11, 1))
activities = self.tracker.get_activities_by_period(period_id)
assert len(activities) == 2
assert all(a.period_id == period_id for a in activities)
def test_get_activities_by_period_with_type_filter(self):
"""Test retrieving activities by period with type filtering."""
# Create period
with self.tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO cost_periods (period_start, period_end, total_costs)
VALUES ('2025-10-01', '2025-10-31', 1000.00)
""")
period_id = cursor.lastrowid
# Log various activities
self.tracker.log_activity(59, ActivityType.CREATED, period_id=period_id)
self.tracker.log_activity(60, ActivityType.MODIFIED, period_id=period_id)
self.tracker.log_activity(61, ActivityType.CLOSED, period_id=period_id)
activities = self.tracker.get_activities_by_period(
period_id,
activity_types=[ActivityType.CREATED, ActivityType.CLOSED]
)
assert len(activities) == 2
assert all(a.activity_type in [ActivityType.CREATED, ActivityType.CLOSED] for a in activities)
def test_get_activity_summary_basic(self):
"""Test basic activity summary generation."""
# Log some test activities
self.tracker.log_activity(59, ActivityType.CREATED)
self.tracker.log_activity(59, ActivityType.MODIFIED)
self.tracker.log_activity(60, ActivityType.CREATED)
summary = self.tracker.get_activity_summary()
assert summary['total_activities'] == 3
assert summary['unique_issues'] == 2
assert 'created' in summary['activities_by_type']
assert 'modified' in summary['activities_by_type']
assert summary['activities_by_type']['created'] == 2
assert summary['activities_by_type']['modified'] == 1
def test_get_activity_summary_with_filters(self):
"""Test activity summary with date and issue filters."""
today = date.today()
yesterday = date(today.year, today.month, today.day - 1) if today.day > 1 else date(today.year, today.month - 1, 28)
# Log activities on different dates
self.tracker.log_activity(59, ActivityType.CREATED, activity_date=yesterday)
self.tracker.log_activity(59, ActivityType.MODIFIED, activity_date=today)
self.tracker.log_activity(60, ActivityType.CREATED, activity_date=today)
# Test issue filter
summary = self.tracker.get_activity_summary(issue_id=59)
assert summary['total_activities'] == 2
assert summary['unique_issues'] == 1
# Test date filter
summary = self.tracker.get_activity_summary(start_date=today)
assert summary['total_activities'] == 2
def test_delete_activity(self):
"""Test deleting an activity record."""
activity_id = self.tracker.log_activity(59, ActivityType.CREATED)
# Verify activity exists
activities = self.tracker.get_issue_activities(59)
assert len(activities) == 1
# Delete activity
result = self.tracker.delete_activity(activity_id)
assert result is True
# Verify activity is gone
activities = self.tracker.get_issue_activities(59)
assert len(activities) == 0
def test_delete_nonexistent_activity(self):
"""Test deleting non-existent activity returns False."""
result = self.tracker.delete_activity(99999)
assert result is False
def test_bulk_log_activities(self):
"""Test logging multiple activities in one transaction."""
activities_data = [
{
'issue_id': 59,
'activity_type': 'created',
'activity_details': 'Bulk created'
},
{
'issue_id': 60,
'activity_type': 'modified',
'activity_details': 'Bulk modified'
}
]
activity_ids = self.tracker.bulk_log_activities(activities_data)
assert len(activity_ids) == 2
assert all(isinstance(aid, int) for aid in activity_ids)
# Verify activities were created
activities_59 = self.tracker.get_issue_activities(59)
activities_60 = self.tracker.get_issue_activities(60)
assert len(activities_59) == 1
assert len(activities_60) == 1
assert activities_59[0].activity_details == 'Bulk created'
assert activities_60[0].activity_details == 'Bulk modified'
def test_bulk_log_activities_validation(self):
"""Test bulk logging validates required fields."""
invalid_data = [
{'issue_id': 59}, # Missing activity_type
{'activity_type': 'created'} # Missing issue_id
]
with pytest.raises(ValueError, match="must have 'issue_id' and 'activity_type'"):
self.tracker.bulk_log_activities(invalid_data)
class TestActivityCommands:
"""Test suite for activity CLI commands."""
def setup_method(self):
"""Set up test fixtures."""
self.temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
self.temp_db.close()
self.db_path = self.temp_db.name
# Initialize database with test data
tracker = IssueActivityTracker(self.db_path)
tracker.log_activity(59, ActivityType.CREATED, activity_details="Test issue created")
tracker.log_activity(59, ActivityType.MODIFIED, activity_details="Test issue modified")
def teardown_method(self):
"""Clean up test fixtures."""
Path(self.db_path).unlink(missing_ok=True)
@patch('markitect.issues.activity_commands.IssueActivityTracker')
def test_log_command_basic(self, mock_tracker_class):
"""Test the log command with basic parameters."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_tracker.log_activity.return_value = 123
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(activity, ['log', '59', 'created'])
assert result.exit_code == 0
assert "✅ Logged created activity for issue #59" in result.output
mock_tracker.log_activity.assert_called_once()
@patch('markitect.issues.activity_commands.IssueActivityTracker')
def test_log_command_with_details(self, mock_tracker_class):
"""Test the log command with activity details."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_tracker.log_activity.return_value = 123
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(activity, ['log', '59', 'created', '--details', 'Test details'])
assert result.exit_code == 0
assert "Test details" in result.output
@patch('markitect.issues.activity_commands.IssueActivityTracker')
def test_show_command(self, mock_tracker_class):
"""Test the show command for displaying issue activities."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_activities = [
IssueActivity(
id=1,
issue_id=59,
activity_type=ActivityType.CREATED,
activity_date=date.today(),
activity_details="Test activity",
created_at=datetime.now()
)
]
mock_tracker.get_issue_activities.return_value = mock_activities
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(activity, ['show', '59'])
assert result.exit_code == 0
assert "📋 Activities for Issue #59" in result.output
assert "Test activity" in result.output
@patch('markitect.issues.activity_commands.IssueActivityTracker')
def test_show_command_json_format(self, mock_tracker_class):
"""Test the show command with JSON output format."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_activities = [
IssueActivity(
id=1,
issue_id=59,
activity_type=ActivityType.CREATED,
activity_date=date.today(),
activity_details="Test activity",
created_at=datetime.now()
)
]
mock_tracker.get_issue_activities.return_value = mock_activities
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(activity, ['show', '59', '--format', 'json'])
assert result.exit_code == 0
# Should be valid JSON
output_data = json.loads(result.output.strip())
assert len(output_data) == 1
assert output_data[0]['issue_id'] == 59
@patch('markitect.issues.activity_commands.IssueActivityTracker')
def test_summary_command(self, mock_tracker_class):
"""Test the summary command."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_summary = {
'total_activities': 5,
'unique_issues': 3,
'activities_by_type': {'created': 3, 'modified': 2},
'date_range': {'start': '2025-10-01', 'end': '2025-10-04'},
'filters': {'issue_id': None, 'start_date': None, 'end_date': None}
}
mock_tracker.get_activity_summary.return_value = mock_summary
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(activity, ['summary'])
assert result.exit_code == 0
assert "📊 Issue Activity Summary" in result.output
assert "Total Activities: 5" in result.output
assert "Unique Issues: 3" in result.output
@patch('markitect.issues.activity_commands.IssueActivityTracker')
def test_delete_command(self, mock_tracker_class):
"""Test the delete command."""
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_tracker.delete_activity.return_value = True
from click.testing import CliRunner
runner = CliRunner()
# Auto-confirm the deletion
result = runner.invoke(activity, ['delete', '123'], input='y\n')
assert result.exit_code == 0
assert "✅ Deleted activity #123" in result.output
mock_tracker.delete_activity.assert_called_once_with(123)
def test_import_activities_json(self):
"""Test importing activities from JSON file."""
# Create test JSON file
test_data = [
{
'issue_id': 59,
'activity_type': 'created',
'activity_details': 'Imported activity'
}
]
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(test_data, f)
json_file_path = f.name
try:
with patch('markitect.issues.activity_commands.IssueActivityTracker') as mock_tracker_class:
mock_tracker = Mock()
mock_tracker_class.return_value = mock_tracker
mock_tracker.bulk_log_activities.return_value = [1]
from click.testing import CliRunner
runner = CliRunner()
result = runner.invoke(activity, ['import-activities', json_file_path])
assert result.exit_code == 0
assert "✅ Successfully imported 1 activities" in result.output
finally:
Path(json_file_path).unlink(missing_ok=True)
class TestActivityIntegration:
"""Integration tests for the complete activity tracking system."""
def setup_method(self):
"""Set up integration test fixtures."""
self.temp_db = tempfile.NamedTemporaryFile(suffix='.db', delete=False)
self.temp_db.close()
self.db_path = self.temp_db.name
def teardown_method(self):
"""Clean up integration test fixtures."""
Path(self.db_path).unlink(missing_ok=True)
def test_full_activity_lifecycle(self):
"""Test the complete lifecycle of activity tracking."""
tracker = IssueActivityTracker(self.db_path)
# 1. Log initial activity
activity_id = tracker.log_activity(
issue_id=59,
activity_type=ActivityType.CREATED,
activity_details="Issue created for testing"
)
assert activity_id is not None
# 2. Log follow-up activities (with slight time differences to ensure ordering)
import time
time.sleep(0.1)
tracker.log_activity(59, ActivityType.MODIFIED, activity_details="Updated description")
time.sleep(0.1)
tracker.log_activity(59, ActivityType.COMMENTED, activity_details="Added comment")
time.sleep(0.1)
tracker.log_activity(59, ActivityType.CLOSED, activity_details="Resolved issue")
# 3. Retrieve issue history
activities = tracker.get_issue_activities(59)
assert len(activities) == 4
# Verify all expected activity types are present
activity_types = [a.activity_type.value for a in activities]
expected_types = {'closed', 'commented', 'modified', 'created'}
assert set(activity_types) == expected_types
# 4. Generate summary
summary = tracker.get_activity_summary(issue_id=59)
assert summary['total_activities'] == 4
assert summary['unique_issues'] == 1
assert len(summary['activities_by_type']) == 4
# 5. Clean up - delete an activity
deleted = tracker.delete_activity(activity_id)
assert deleted is True
# Verify deletion
remaining_activities = tracker.get_issue_activities(59)
assert len(remaining_activities) == 3
def test_multi_issue_activity_tracking(self):
"""Test activity tracking across multiple issues."""
tracker = IssueActivityTracker(self.db_path)
# Log activities for multiple issues
issues = [59, 60, 61, 62]
for issue_id in issues:
tracker.log_activity(issue_id, ActivityType.CREATED)
if issue_id % 2 == 0: # Even issues get modified
tracker.log_activity(issue_id, ActivityType.MODIFIED)
# Test overall summary
summary = tracker.get_activity_summary()
assert summary['total_activities'] == 6 # 4 created + 2 modified
assert summary['unique_issues'] == 4
assert summary['activities_by_type']['created'] == 4
assert summary['activities_by_type']['modified'] == 2
# Test individual issue tracking
for issue_id in issues:
activities = tracker.get_issue_activities(issue_id)
expected_count = 2 if issue_id % 2 == 0 else 1
assert len(activities) == expected_count
def test_cost_period_integration(self):
"""Test integration with cost period functionality."""
tracker = IssueActivityTracker(self.db_path)
# Create cost periods
with tracker.finance_models.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO cost_periods (period_start, period_end, total_costs)
VALUES ('2025-01-01', '2025-03-31', 5000.00)
""")
q1_period_id = cursor.lastrowid
cursor.execute("""
INSERT INTO cost_periods (period_start, period_end, total_costs)
VALUES ('2025-04-01', '2025-06-30', 6000.00)
""")
q2_period_id = cursor.lastrowid
periods = {'Q1 2025': q1_period_id, 'Q2 2025': q2_period_id}
# Log activities in different periods
tracker.log_activity(59, ActivityType.CREATED, period_id=periods['Q1 2025'])
tracker.log_activity(60, ActivityType.MODIFIED, period_id=periods['Q1 2025'])
tracker.log_activity(61, ActivityType.CLOSED, period_id=periods['Q2 2025'])
# Test period-based retrieval
q1_activities = tracker.get_activities_by_period(periods['Q1 2025'])
q2_activities = tracker.get_activities_by_period(periods['Q2 2025'])
assert len(q1_activities) == 2
assert len(q2_activities) == 1
assert all(a.period_id == periods['Q1 2025'] for a in q1_activities)
assert all(a.period_id == periods['Q2 2025'] for a in q2_activities)
# Test filtering by activity type within period
q1_created = tracker.get_activities_by_period(
periods['Q1 2025'],
activity_types=[ActivityType.CREATED]
)
assert len(q1_created) == 1
assert q1_created[0].activity_type == ActivityType.CREATED