Files
markitect-main/markitect/ast_service.py
tegwick 162a2ae93c feat: Add Kaizen Optimizer and Optimized Refactoring Assistant agents
Added two new Claude Code subagents following proper specification format:

**Kaizen Optimizer Agent:**
- Meta-agent for analyzing and optimizing other subagents
- Performance analysis and specification improvement recommendations
- Agent ecosystem health assessment and continuous improvement
- Proper YAML frontmatter with proactive usage guidelines

**Refactoring Assistant Agent (Optimized):**
- Streamlined from 19-section complex specification to focused Claude Code format
- Code quality assessment and refactoring guidance within Claude Code environment
- Security analysis and performance optimization recommendations
- Integration with existing agent ecosystem (tddai-assistant, general-purpose, project-assistant)

**Also includes Issue #15 AST Query CLI implementation:**
- AST Service with display, query, and statistics capabilities
- JSONPath integration for flexible AST navigation
- CLI commands: ast-show, ast-query, ast-stats (22/22 tests passing)
- Leverages existing cache system for optimal performance

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-26 02:02:00 +02:00

270 lines
8.9 KiB
Python

"""
AST Service for Issue #15 - AST Query and Analysis functionality.
This service provides high-level AST operations for the CLI commands:
- AST display and visualization
- JSONPath querying of AST structures
- Statistical analysis of document content
Leverages the existing AST cache system for optimal performance.
"""
import json
import sys
from collections import Counter
from pathlib import Path
from typing import Dict, List, Any, Optional
from jsonpath_ng import parse as jsonpath_parse
from .ast_cache import ASTCache
from .cache_service import CacheDirectoryService
class ASTService:
"""
Service for AST introspection and analysis operations.
Provides high-level operations for CLI commands while leveraging
the existing AST cache system for performance optimization.
"""
def __init__(self):
"""Initialize AST service with cache integration."""
self.cache_service = CacheDirectoryService()
cache_dir = self.cache_service.get_cache_directory()
self.ast_cache = ASTCache(cache_dir)
def display_ast(self, file_path: Path, format_type: str = "tree") -> Dict[str, Any]:
"""
Display AST structure for a markdown file.
Args:
file_path: Path to markdown file
format_type: Display format (tree, json, compact)
Returns:
Dictionary with display results and metadata
"""
try:
if not file_path.exists():
return {
'success': False,
'message': f'File not found: {file_path}',
'output': ''
}
# Load AST using cache system
ast = self.ast_cache.load_cached_ast(file_path)
if format_type == "json":
output = json.dumps(ast, indent=2, ensure_ascii=False)
elif format_type == "compact":
output = self._format_ast_compact(ast)
else: # tree format (default)
output = self._format_ast_tree(ast)
return {
'success': True,
'message': f'AST structure for {file_path.name}',
'output': output,
'token_count': len(ast)
}
except Exception as e:
return {
'success': False,
'message': f'Error displaying AST: {e}',
'output': ''
}
def query_ast(self, file_path: Path, jsonpath_expr: str) -> Dict[str, Any]:
"""
Query AST using JSONPath expressions.
Args:
file_path: Path to markdown file
jsonpath_expr: JSONPath query expression
Returns:
Dictionary with query results and metadata
"""
try:
if not file_path.exists():
return {
'success': False,
'message': f'File not found: {file_path}',
'matches': [],
'count': 0
}
# Load AST using cache system
ast = self.ast_cache.load_cached_ast(file_path)
# Parse JSONPath expression
try:
jsonpath_expr_parsed = jsonpath_parse(jsonpath_expr)
except Exception as e:
return {
'success': False,
'message': f'Invalid JSONPath syntax: {e}',
'matches': [],
'count': 0
}
# Execute query
matches = jsonpath_expr_parsed.find(ast)
results = [match.value for match in matches]
return {
'success': True,
'message': f'JSONPath query results for {file_path.name}',
'matches': results,
'count': len(results),
'query': jsonpath_expr
}
except Exception as e:
return {
'success': False,
'message': f'Error executing query: {e}',
'matches': [],
'count': 0
}
def analyze_ast_statistics(self, file_path: Path) -> Dict[str, Any]:
"""
Generate comprehensive statistics about AST structure.
Args:
file_path: Path to markdown file
Returns:
Dictionary with detailed statistics
"""
try:
if not file_path.exists():
return {
'success': False,
'message': f'File not found: {file_path}',
'statistics': {}
}
# Load AST using cache system
ast = self.ast_cache.load_cached_ast(file_path)
stats = self._calculate_ast_statistics(ast)
return {
'success': True,
'message': f'AST statistics for {file_path.name}',
'statistics': stats
}
except Exception as e:
return {
'success': False,
'message': f'Error analyzing statistics: {e}',
'statistics': {}
}
def _format_ast_tree(self, ast: List[Dict[str, Any]]) -> str:
"""Format AST as a tree structure."""
lines = []
for i, token in enumerate(ast):
level = token.get('level', 0)
indent = ' ' * level
token_type = token.get('type', 'unknown')
# Add some content info for readability
content_info = ""
if token.get('content'):
content_preview = token['content'][:30]
if len(token['content']) > 30:
content_preview += "..."
content_info = f' "{content_preview}"'
elif token.get('tag'):
content_info = f' <{token["tag"]}>'
lines.append(f'{indent}[{i:2d}] {token_type}{content_info}')
return '\n'.join(lines)
def _format_ast_compact(self, ast: List[Dict[str, Any]]) -> str:
"""Format AST in compact form."""
lines = []
for token in ast:
token_type = token.get('type', 'unknown')
if token.get('content'):
content = token['content'][:20]
if len(token['content']) > 20:
content += "..."
lines.append(f'{token_type}: "{content}"')
else:
lines.append(f'{token_type}')
return '\n'.join(lines)
def _calculate_ast_statistics(self, ast: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Calculate comprehensive AST statistics."""
if not ast:
return {
'total_tokens': 0,
'headings': {'total': 0, 'by_level': {}},
'paragraphs': 0,
'links': 0,
'lists': {'ordered': 0, 'unordered': 0},
'code_blocks': 0,
'inline_code': 0,
'blockquotes': 0,
'emphasis': {'strong': 0, 'italic': 0},
'document_structure': 'empty'
}
# Count token types
token_types = Counter(token.get('type', 'unknown') for token in ast)
# Analyze headings by level
headings_by_level = {}
for token in ast:
if token.get('type') == 'heading_open':
tag = token.get('tag', 'h1')
level = int(tag[1:]) if tag.startswith('h') else 1
headings_by_level[f'h{level}'] = headings_by_level.get(f'h{level}', 0) + 1
# Count various elements
stats = {
'total_tokens': len(ast),
'headings': {
'total': token_types.get('heading_open', 0),
'by_level': headings_by_level
},
'paragraphs': token_types.get('paragraph_open', 0),
'links': token_types.get('link_open', 0),
'lists': {
'ordered': token_types.get('ordered_list_open', 0),
'unordered': token_types.get('bullet_list_open', 0)
},
'code_blocks': token_types.get('fence', 0) + token_types.get('code_block', 0),
'inline_code': token_types.get('code_inline', 0),
'blockquotes': token_types.get('blockquote_open', 0),
'emphasis': {
'strong': token_types.get('strong_open', 0),
'italic': token_types.get('em_open', 0)
}
}
# Determine document structure
if stats['headings']['total'] > 0:
if stats['paragraphs'] > stats['headings']['total']:
stats['document_structure'] = 'article'
else:
stats['document_structure'] = 'outline'
elif stats['lists']['ordered'] + stats['lists']['unordered'] > 0:
stats['document_structure'] = 'list-based'
elif stats['paragraphs'] > 0:
stats['document_structure'] = 'simple'
else:
stats['document_structure'] = 'minimal'
return stats