feat: implement modular capability system with automatic discovery

- Move release management to capabilities/release-management/ with complete Makefile
- Create automatic capability discovery system in scripts/capability_discovery.mk
- Add capability-manager subagent for managing modular architecture
- Implement target delegation system enabling capability-name-target patterns
- Create Makefiles for markitect-content, markitect-utils, and issue-facade capabilities
- Remove legacy release management code and documentation from main project
- Update main Makefile to use capability discovery and delegation
- Add comprehensive capability status, help, and management targets

The capability system provides:
- Automatic discovery of capabilities with Makefiles
- Clean target delegation without conflicts
- Modular architecture following established patterns
- Comprehensive help and status reporting
- Zero-conflict capability integration

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-09 01:29:15 +01:00
parent d505c15d40
commit d0ffdc057c
38 changed files with 3978 additions and 1361 deletions

View File

@@ -0,0 +1,65 @@
"""
Release Management Capability
A comprehensive release management system for Python projects providing:
- Automatic version management with setuptools-scm
- Package building and distribution
- Multi-platform publishing (Gitea, PyPI, etc.)
- Git tag-based release workflows
- CLI tools for release automation
Main Components:
- ReleaseManager: Orchestrates complete release workflows
- PackageBuilder: Handles package generation and building
- PublishManager: Manages package publication to registries
- GitManager: Git operations for releases
- Registry Support: Gitea, PyPI, and extensible registry system
Quick Start:
from release_management import ReleaseManager
manager = ReleaseManager()
success = manager.publish_release("1.0.0")
CLI Usage:
release status
release publish --version 1.0.0
release upload --registry gitea
"""
from .core.manager import ReleaseManager
from .core.builder import PackageBuilder
from .core.publisher import PublishManager
from .git.manager import GitManager
from .registries.factory import RegistryFactory
from .registries.gitea.registry import GiteaRegistry
from .utils.version import VersionManager
from .utils.validation import ReleaseValidator
# Version is managed in pyproject.toml
__version__ = "0.1.0"
__all__ = [
# Core classes
"ReleaseManager",
"PackageBuilder",
"PublishManager",
"GitManager",
# Registry support
"RegistryFactory",
"GiteaRegistry",
# Utilities
"VersionManager",
"ReleaseValidator",
# Version
"__version__",
]
# Package metadata
__title__ = "release-management"
__description__ = "Comprehensive release management capability for Python projects"
__author__ = "MarkiTect Project"
__license__ = "MIT"

View File

@@ -0,0 +1,9 @@
"""
Command-line interface for release management.
This module provides CLI commands for release operations.
"""
from .main import main
__all__ = ["main"]

View File

@@ -0,0 +1,252 @@
"""
Main CLI entry point for release management.
This module provides the main CLI interface adapted from the original release.py script.
"""
import click
import sys
from pathlib import Path
from typing import Optional
from ..core.manager import ReleaseManager
from ..utils.version import VersionManager
@click.group(invoke_without_command=True)
@click.option('--dry-run', is_flag=True, help='Show what would be done without making changes')
@click.option('--force', is_flag=True, help='Force operation even with warnings')
@click.option('--project-root', type=click.Path(exists=True, path_type=Path),
help='Project root directory')
@click.pass_context
def main(ctx, dry_run: bool, force: bool, project_root: Optional[Path]):
"""Release management CLI for Python projects."""
ctx.ensure_object(dict)
ctx.obj['dry_run'] = dry_run
ctx.obj['force'] = force
ctx.obj['project_root'] = project_root
# If no command specified, show status
if ctx.invoked_subcommand is None:
ctx.invoke(status)
@main.command()
@click.pass_context
def status(ctx):
"""Show current release status and version information."""
manager = ReleaseManager(
project_root=ctx.obj['project_root'],
dry_run=ctx.obj['dry_run'],
force=ctx.obj['force']
)
print("🔍 Release Status")
print("=" * 60)
status_info = manager.get_release_status()
# Version information
print(f"Current Version: {status_info['version']}")
# Git information
if status_info.get('is_repo'):
print(f"Git Branch: {status_info['branch']}")
print(f"Latest Commit: {status_info['latest_commit']}")
print(f"Latest Tag: {status_info['latest_tag'] or 'None'}")
print(f"Uncommitted Changes: {'Yes' if status_info['has_changes'] else 'No'}")
else:
print("Git Repository: Not available")
# Package information
packages = status_info['packages']
print(f"\\nBuilt Packages: {packages['total_count']} files")
if packages['wheels']:
print(" Wheels:")
for wheel in packages['wheels']:
print(f" - {wheel}")
if packages['sdists']:
print(" Source Distributions:")
for sdist in packages['sdists']:
print(f" - {sdist}")
# Validation status
validation = status_info['validation']
if validation['is_valid']:
print("\\n✅ Repository is ready for release")
else:
print("\\n❌ Release validation issues:")
for issue in validation['issues']:
print(f" - {issue}")
@main.command()
@click.pass_context
def validate(ctx):
"""Validate repository state for release readiness."""
manager = ReleaseManager(
project_root=ctx.obj['project_root'],
dry_run=ctx.obj['dry_run'],
force=ctx.obj['force']
)
is_valid, issues = manager.validate_release_state()
if is_valid:
print("✅ Repository is ready for release")
else:
print("❌ Release validation failed:")
for issue in issues:
print(f" - {issue}")
sys.exit(1)
@main.command()
@click.option('--version', required=True, help='Version to tag (e.g., 0.8.0)')
@click.option('--message', help='Tag message')
@click.pass_context
def tag(ctx, version: str, message: Optional[str]):
"""Create git tag for version."""
manager = ReleaseManager(
project_root=ctx.obj['project_root'],
dry_run=ctx.obj['dry_run'],
force=ctx.obj['force']
)
if manager.create_tag(version, message):
print(f"✅ Successfully created tag for version {version}")
else:
print(f"❌ Failed to create tag for version {version}")
sys.exit(1)
@main.command()
@click.pass_context
def build(ctx):
"""Build release packages using setuptools-scm."""
manager = ReleaseManager(
project_root=ctx.obj['project_root'],
dry_run=ctx.obj['dry_run'],
force=ctx.obj['force']
)
if manager.build_packages():
print("✅ Packages built successfully")
else:
print("❌ Package build failed")
sys.exit(1)
@main.command()
@click.option('--version', required=True, help='Version to publish (e.g., 0.8.0)')
@click.option('--registry', default='gitea', help='Registry type (gitea, pypi, etc.)')
@click.option('--skip-build', is_flag=True, help='Skip building and use existing packages')
@click.pass_context
def publish(ctx, version: str, registry: str, skip_build: bool):
"""Complete release workflow: tag, build, and publish."""
manager = ReleaseManager(
project_root=ctx.obj['project_root'],
dry_run=ctx.obj['dry_run'],
force=ctx.obj['force']
)
if manager.publish_with_fallback(version, registry, skip_build):
print(f"🎉 Release {version} published successfully!")
else:
print(f"❌ Release {version} failed")
sys.exit(1)
@main.command()
@click.option('--registry', default='gitea', help='Registry type (gitea, pypi, etc.)')
@click.pass_context
def upload(ctx, registry: str):
"""Upload existing packages to registry."""
manager = ReleaseManager(
project_root=ctx.obj['project_root'],
dry_run=ctx.obj['dry_run'],
force=ctx.obj['force']
)
if manager.upload_existing_packages(registry):
print(f"✅ Packages uploaded to {registry}")
else:
print(f"❌ Upload to {registry} failed")
sys.exit(1)
@main.command('registry-info')
@click.option('--registry', default='gitea', help='Registry type to show info for')
@click.pass_context
def registry_info(ctx, registry: str):
"""Show package registry information and status."""
manager = ReleaseManager(
project_root=ctx.obj['project_root'],
dry_run=ctx.obj['dry_run'],
force=ctx.obj['force']
)
info = manager.show_registry_info(registry)
print(f"📦 {registry.title()} Registry Information")
print("=" * 50)
if 'error' in info:
print(f"❌ Error: {info['error']}")
return
for key, value in info.items():
if isinstance(value, bool):
indicator = "" if value else ""
print(f"{key.replace('_', ' ').title()}: {indicator}")
else:
print(f"{key.replace('_', ' ').title()}: {value}")
@main.command()
@click.pass_context
def clean(ctx):
"""Clean build artifacts and temporary files."""
manager = ReleaseManager(
project_root=ctx.obj['project_root'],
dry_run=ctx.obj['dry_run'],
force=ctx.obj['force']
)
manager.clean_build_artifacts()
print("✅ Build artifacts cleaned")
@main.command('version-info')
@click.option('--suggest', is_flag=True, help='Suggest next version options')
@click.pass_context
def version_info(ctx, suggest: bool):
"""Show version information and suggestions."""
version_manager = VersionManager(ctx.obj['project_root'])
current = version_manager.get_current_version()
print(f"Current Version: {current}")
if suggest:
suggestions = version_manager.suggest_version(current)
if 'error' in suggestions:
print(f"{suggestions['error']}")
if 'suggestion' in suggestions:
print(f"💡 {suggestions['suggestion']}")
else:
print("\\nSuggested next versions:")
print(f" Patch: {suggestions['patch']}")
print(f" Minor: {suggestions['minor']}")
print(f" Major: {suggestions['major']}")
# Show version components
version_data = version_manager.parse_version(current)
if 'error' not in version_data:
print("\\nVersion Components:")
for key, value in version_data.items():
if value is not None:
print(f" {key.replace('_', ' ').title()}: {value}")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,14 @@
"""
Core release management classes.
This module provides the main classes for orchestrating releases:
- ReleaseManager: Main coordinator for release workflows
- PackageBuilder: Package building and setuptools-scm integration
- PublishManager: Package publication to registries
"""
from .manager import ReleaseManager
from .builder import PackageBuilder
from .publisher import PublishManager
__all__ = ["ReleaseManager", "PackageBuilder", "PublishManager"]

View File

@@ -0,0 +1,166 @@
"""
Package building functionality for releases.
This module handles package building using setuptools-scm and the Python build module.
"""
import subprocess
import sys
from pathlib import Path
from typing import List, Optional, Dict, Any
from ..utils.version import VersionManager
class PackageBuilder:
"""Handles package building with setuptools-scm integration."""
def __init__(self, project_root: Optional[Path] = None, dry_run: bool = False):
"""Initialize the package builder.
Args:
project_root: Root directory of the project. Defaults to current directory.
dry_run: If True, show what would be done without executing.
"""
self.project_root = project_root or Path.cwd()
self.dry_run = dry_run
self.dist_dir = self.project_root / "dist"
def get_current_version(self) -> str:
"""Get current version using setuptools-scm.
Returns:
Current version string or "unknown" if unavailable.
"""
try:
result = self._run_command(
['python', '-m', 'setuptools_scm'],
capture=True,
skip_dry_run=True
)
return result.stdout.strip()
except subprocess.CalledProcessError:
return "unknown"
def clean_build(self) -> None:
"""Clean previous build artifacts."""
print("🧹 Cleaning build artifacts...")
patterns = ['build', 'dist', '*.egg-info']
for pattern in patterns:
try:
if pattern == 'dist' and self.dist_dir.exists():
if self.dry_run:
print(f"[DRY RUN] Would remove: {self.dist_dir}")
else:
import shutil
shutil.rmtree(self.dist_dir)
print(f"✅ Removed: {self.dist_dir}")
elif pattern != 'dist':
self._run_command(['rm', '-rf', pattern])
except subprocess.CalledProcessError:
pass # Ignore if files don't exist
def build_packages(self) -> bool:
"""Build release packages using setuptools-scm.
Returns:
True if build successful, False otherwise.
"""
print(f"📦 Building packages (version auto-determined by setuptools-scm)")
# Clean previous builds
self.clean_build()
# Build packages
try:
print("Building packages...")
self._run_command(['python', '-m', 'build'], capture=False)
print("✅ Packages built successfully")
# Show package details
self._show_package_details()
return True
except subprocess.CalledProcessError as e:
print(f"❌ Package build failed: {e}")
return False
def get_built_packages(self) -> Dict[str, List[Path]]:
"""Get list of built packages by type.
Returns:
Dictionary with 'wheels' and 'sdists' keys containing file paths.
"""
if not self.dist_dir.exists():
return {"wheels": [], "sdists": []}
wheels = list(self.dist_dir.glob("*.whl"))
sdists = list(self.dist_dir.glob("*.tar.gz"))
return {"wheels": wheels, "sdists": sdists}
def validate_packages(self) -> bool:
"""Validate that expected packages were built.
Returns:
True if packages are valid, False otherwise.
"""
packages = self.get_built_packages()
if not packages["wheels"] and not packages["sdists"]:
print("❌ No packages found in dist/")
return False
# Check package sizes
for wheel in packages["wheels"]:
if wheel.stat().st_size < 1000: # Less than 1KB
print(f"⚠️ Warning: {wheel.name} is very small ({wheel.stat().st_size} bytes)")
for sdist in packages["sdists"]:
if sdist.stat().st_size < 1000: # Less than 1KB
print(f"⚠️ Warning: {sdist.name} is very small ({sdist.stat().st_size} bytes)")
return True
def _show_package_details(self) -> None:
"""Show details about built packages."""
packages = self.get_built_packages()
if packages["wheels"] or packages["sdists"]:
print(f"\n📦 Built packages in {self.dist_dir}:")
for wheel in packages["wheels"]:
size = wheel.stat().st_size
print(f" 🎯 {wheel.name} ({size:,} bytes)")
for sdist in packages["sdists"]:
size = sdist.stat().st_size
print(f" 📄 {sdist.name} ({size:,} bytes)")
else:
print("❌ No packages found")
def _run_command(self, cmd: List[str], capture: bool = True,
check: bool = True, skip_dry_run: bool = False) -> subprocess.CompletedProcess:
"""Run a command with optional dry-run support.
Args:
cmd: Command to execute
capture: Whether to capture output
check: Whether to raise on non-zero exit
skip_dry_run: Whether to skip dry-run check and always execute
Returns:
CompletedProcess result
"""
if self.dry_run and not skip_dry_run:
print(f"[DRY RUN] Would run: {' '.join(cmd)}")
return subprocess.CompletedProcess(cmd, 0, "", "")
return subprocess.run(
cmd,
capture_output=capture,
text=True,
check=check,
cwd=self.project_root
)

View File

@@ -0,0 +1,215 @@
"""
Main release manager orchestrating complete release workflows.
This module provides the primary ReleaseManager class that coordinates
all aspects of the release process.
"""
from pathlib import Path
from typing import Optional, List, Tuple, Dict, Any
from .builder import PackageBuilder
from .publisher import PublishManager
from ..git.manager import GitManager
from ..utils.validation import ReleaseValidator
class ReleaseManager:
"""Main orchestrator for release workflows."""
def __init__(self, project_root: Optional[Path] = None, dry_run: bool = False, force: bool = False):
"""Initialize the release manager.
Args:
project_root: Root directory of the project. Defaults to current directory.
dry_run: If True, show what would be done without executing.
force: If True, skip validation checks.
"""
self.project_root = project_root or Path.cwd()
self.dry_run = dry_run
self.force = force
# Initialize component managers
self.git_manager = GitManager(project_root, dry_run)
self.builder = PackageBuilder(project_root, dry_run)
self.publisher = PublishManager(project_root, dry_run)
self.validator = ReleaseValidator(project_root)
def get_release_status(self) -> Dict[str, Any]:
"""Get comprehensive release status information.
Returns:
Dictionary with release status details
"""
status = {}
# Version information
status['version'] = self.builder.get_current_version()
# Git status
git_status = self.git_manager.get_repository_status()
status.update(git_status)
# Package status
packages = self.builder.get_built_packages()
status['packages'] = {
'wheels': [p.name for p in packages['wheels']],
'sdists': [p.name for p in packages['sdists']],
'total_count': len(packages['wheels']) + len(packages['sdists'])
}
# Validation status
is_valid, issues = self.validate_release_state()
status['validation'] = {
'is_valid': is_valid,
'issues': issues
}
return status
def validate_release_state(self) -> Tuple[bool, List[str]]:
"""Validate that the repository is ready for release.
Returns:
Tuple of (is_valid, list_of_issues)
"""
return self.validator.validate_release_state(force=self.force)
def create_tag(self, version: str, message: Optional[str] = None) -> bool:
"""Create a git tag for the release.
Args:
version: Version to tag (e.g., "1.0.0")
message: Optional tag message
Returns:
True if tag created successfully, False otherwise
"""
# Validate release state first
is_valid, issues = self.validate_release_state()
if not is_valid and not self.force:
print("❌ Cannot create tag:")
for issue in issues:
print(f" - {issue}")
return False
return self.git_manager.create_tag(version, message)
def build_packages(self) -> bool:
"""Build release packages.
Returns:
True if build successful, False otherwise
"""
success = self.builder.build_packages()
if success:
success = self.builder.validate_packages()
return success
def publish_release(self, version: str, registry_type: str = 'gitea',
skip_build: bool = False) -> bool:
"""Complete release workflow: validate, tag, build, and publish.
Args:
version: Version to release
registry_type: Type of registry to publish to
skip_build: If True, skip building and use existing packages
Returns:
True if release successful, False otherwise
"""
print(f"🚀 Publishing release {version}")
# Validate state
is_valid, issues = self.validate_release_state()
if not is_valid and not self.force:
print("❌ Cannot publish release:")
for issue in issues:
print(f" - {issue}")
return False
# Create git tag (this determines the version for setuptools-scm)
if not self.git_manager.tag_exists(f"v{version}"):
if not self.create_tag(version):
return False
else:
print(f"✅ Tag v{version} already exists")
# Build packages (setuptools-scm will use the tag for version)
if not skip_build:
if not self.build_packages():
return False
else:
print("⏭️ Skipping build (using existing packages)")
# Publish packages
if not self.publisher.publish_packages(registry_type):
print("⚠️ Release completed but publishing failed")
return False
print(f"✅ Release {version} completed successfully!")
print(f"📦 Packages published to {registry_type}")
print(f"🏷️ Git tag v{version} created")
return True
def publish_with_fallback(self, version: str, registry_type: str = 'gitea',
skip_build: bool = False) -> bool:
"""Complete release workflow with fallback to release assets.
Args:
version: Version to release
registry_type: Type of registry to publish to
skip_build: If True, skip building and use existing packages
Returns:
True if release successful, False otherwise
"""
# Try normal publish first
if self.publish_release(version, registry_type, skip_build):
return True
# If that fails, try release assets fallback
print("🔄 Attempting release assets fallback...")
return self.publisher.publish_as_release_assets(version, registry_type)
def upload_existing_packages(self, registry_type: str = 'gitea') -> bool:
"""Upload existing packages without building or tagging.
Args:
registry_type: Type of registry to upload to
Returns:
True if upload successful, False otherwise
"""
print(f"📤 Uploading existing packages to {registry_type}")
packages = self.builder.get_built_packages()
if not packages["wheels"] and not packages["sdists"]:
print("❌ No packages found in dist/. Run build first.")
return False
all_packages = packages["wheels"] + packages["sdists"]
return self.publisher.upload_specific_packages(all_packages, registry_type)
def clean_build_artifacts(self) -> None:
"""Clean build artifacts and temporary files."""
self.builder.clean_build()
def show_registry_info(self, registry_type: str = 'gitea') -> Dict[str, Any]:
"""Show information about a registry.
Args:
registry_type: Type of registry
Returns:
Dictionary with registry information
"""
return self.publisher.get_registry_info(registry_type)
def get_commits_since_last_tag(self) -> List[str]:
"""Get commits since the last release tag.
Returns:
List of commit messages since last tag
"""
return self.git_manager.get_commits_since_tag()

View File

@@ -0,0 +1,248 @@
"""
Package publishing functionality for releases.
This module handles publishing packages to various registries.
"""
from pathlib import Path
from typing import Dict, List, Optional, Any
from ..registries.factory import RegistryFactory
from ..registries.base import RegistryInterface
from .builder import PackageBuilder
class PublishManager:
"""Handles package publication to registries."""
def __init__(self, project_root: Optional[Path] = None, dry_run: bool = False):
"""Initialize the publish manager.
Args:
project_root: Root directory of the project. Defaults to current directory.
dry_run: If True, show what would be done without executing.
"""
self.project_root = project_root or Path.cwd()
self.dry_run = dry_run
def publish_packages(self, registry_type: str = 'gitea',
registry_config: Optional[Dict[str, Any]] = None) -> bool:
"""Publish packages to specified registry.
Args:
registry_type: Type of registry to publish to
registry_config: Optional registry configuration
Returns:
True if publishing successful, False otherwise
"""
try:
# Get registry client
registry = self._get_registry(registry_type, registry_config)
# Get built packages
builder = PackageBuilder(self.project_root)
packages = builder.get_built_packages()
if not packages["wheels"] and not packages["sdists"]:
print("❌ No packages found in dist/. Run build first.")
return False
# Upload packages
success = True
for wheel in packages["wheels"]:
if not self._upload_package_with_fallback(registry, wheel):
success = False
for sdist in packages["sdists"]:
if not self._upload_package_with_fallback(registry, sdist):
success = False
return success
except Exception as e:
print(f"❌ Publishing failed: {e}")
return False
def upload_specific_packages(self, package_paths: List[Path],
registry_type: str = 'gitea',
registry_config: Optional[Dict[str, Any]] = None) -> bool:
"""Upload specific package files to registry.
Args:
package_paths: List of paths to package files
registry_type: Type of registry to publish to
registry_config: Optional registry configuration
Returns:
True if all uploads successful, False otherwise
"""
try:
registry = self._get_registry(registry_type, registry_config)
success = True
for package_path in package_paths:
if not package_path.exists():
print(f"❌ Package not found: {package_path}")
success = False
continue
if not self._upload_package_with_fallback(registry, package_path):
success = False
return success
except Exception as e:
print(f"❌ Upload failed: {e}")
return False
def publish_as_release_assets(self, version: str,
registry_type: str = 'gitea',
registry_config: Optional[Dict[str, Any]] = None) -> bool:
"""Publish packages as release assets (fallback method).
Args:
version: Version to publish as
registry_type: Type of registry (must support release assets)
registry_config: Optional registry configuration
Returns:
True if publishing successful, False otherwise
"""
try:
registry = self._get_registry(registry_type, registry_config)
# Check if registry supports release assets
if not hasattr(registry, 'upload_package_as_release_assets'):
print(f"❌ Registry type '{registry_type}' does not support release assets")
return False
# Get built packages
builder = PackageBuilder(self.project_root)
packages = builder.get_built_packages()
if not packages["wheels"] and not packages["sdists"]:
print("❌ No packages found in dist/. Run build first.")
return False
# Find wheel and corresponding source distribution
success = True
for wheel in packages["wheels"]:
# Find matching sdist
sdist = None
wheel_name_parts = wheel.stem.split('-')
package_name = wheel_name_parts[0] if wheel_name_parts else ""
for potential_sdist in packages["sdists"]:
if potential_sdist.stem.startswith(package_name):
sdist = potential_sdist
break
# Upload as release assets
if not registry.upload_package_as_release_assets(
version, wheel, sdist, dry_run=self.dry_run
):
success = False
return success
except Exception as e:
print(f"❌ Release asset publishing failed: {e}")
return False
def get_registry_info(self, registry_type: str = 'gitea',
registry_config: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Get information about a registry.
Args:
registry_type: Type of registry
registry_config: Optional registry configuration
Returns:
Dictionary with registry information
"""
try:
registry = self._get_registry(registry_type, registry_config)
return registry.get_registry_info()
except Exception as e:
return {"error": str(e)}
def _get_registry(self, registry_type: str,
registry_config: Optional[Dict[str, Any]] = None) -> RegistryInterface:
"""Get a registry client.
Args:
registry_type: Type of registry
registry_config: Optional registry configuration
Returns:
Registry client instance
"""
if registry_config:
return RegistryFactory.create(registry_type, registry_config)
else:
# Try to load from pyproject.toml first
try:
return RegistryFactory.create_from_pyproject_config(
self.project_root / "pyproject.toml",
registry_type
)
except (ValueError, FileNotFoundError):
# Fallback to auto-detection
return RegistryFactory.create(registry_type)
def _upload_package_with_fallback(self, registry: RegistryInterface,
package_path: Path) -> bool:
"""Upload a package with fallback to release assets if needed.
Args:
registry: Registry client
package_path: Path to package file
Returns:
True if upload successful, False otherwise
"""
try:
# Try normal package upload first
return registry.upload_package(package_path, dry_run=self.dry_run)
except Exception as e:
print(f"⚠️ Package upload failed: {e}")
# Check if registry supports release assets as fallback
if hasattr(registry, 'upload_package_as_release_assets'):
print("🔄 Trying release assets as fallback...")
# Extract version from package filename for release assets
version = self._extract_version_from_filename(package_path)
if version:
return registry.upload_package_as_release_assets(
version, package_path, dry_run=self.dry_run
)
return False
def _extract_version_from_filename(self, package_path: Path) -> Optional[str]:
"""Extract version from package filename.
Args:
package_path: Path to package file
Returns:
Version string or None if not found
"""
try:
if package_path.suffix == '.whl':
# Wheel format: package-version-python-abi-platform.whl
parts = package_path.stem.split('-')
if len(parts) >= 2:
return parts[1]
elif package_path.suffix == '.gz' and package_path.name.endswith('.tar.gz'):
# Source dist format: package-version.tar.gz
name_without_tar = package_path.name.replace('.tar.gz', '')
parts = name_without_tar.split('-')
if len(parts) >= 2:
return parts[1]
except Exception:
pass
return None

View File

@@ -0,0 +1,12 @@
"""
Git management for releases.
This module provides Git operations required for release workflows:
- Tag creation and management
- Repository state validation
- Branch and commit operations
"""
from .manager import GitManager
__all__ = ["GitManager"]

View File

@@ -0,0 +1,205 @@
"""
Git operations for release management.
This module handles all Git-related operations needed for releases.
"""
import subprocess
from pathlib import Path
from typing import Dict, Any, Optional, List
class GitManager:
"""Manages Git operations for releases."""
def __init__(self, project_root: Optional[Path] = None, dry_run: bool = False):
"""Initialize Git manager.
Args:
project_root: Root directory of the project
dry_run: If True, show what would be done without executing
"""
self.project_root = project_root or Path.cwd()
self.dry_run = dry_run
def get_repository_status(self) -> Dict[str, Any]:
"""Get current git repository status.
Returns:
Dictionary with repository status information
"""
try:
# Get current branch
branch_result = self._run_command(['git', 'branch', '--show-current'])
current_branch = branch_result.stdout.strip()
# Check for uncommitted changes
status_result = self._run_command(['git', 'status', '--porcelain'])
has_changes = bool(status_result.stdout.strip())
# Get latest commit
commit_result = self._run_command(['git', 'rev-parse', '--short', 'HEAD'])
latest_commit = commit_result.stdout.strip()
# Get latest tag
try:
tag_result = self._run_command(['git', 'describe', '--tags', '--abbrev=0'])
latest_tag = tag_result.stdout.strip()
except subprocess.CalledProcessError:
latest_tag = None
return {
'is_repo': True,
'branch': current_branch,
'has_changes': has_changes,
'latest_commit': latest_commit,
'latest_tag': latest_tag
}
except subprocess.CalledProcessError:
return {'is_repo': False}
def create_tag(self, version: str, message: Optional[str] = None) -> bool:
"""Create and push git tag.
Args:
version: Version to tag (e.g., "1.0.0")
message: Optional tag message
Returns:
True if successful, False otherwise
"""
if not version.startswith('v'):
tag_name = f"v{version}"
else:
tag_name = version
tag_message = message or f"Release {version.lstrip('v')}"
print(f"🏷️ Creating git tag {tag_name}")
try:
# Create annotated tag
self._run_command(['git', 'tag', '-a', tag_name, '-m', tag_message])
print(f"✅ Tag {tag_name} created")
# Push tag to origin
try:
print(f"📤 Pushing tag to origin...")
self._run_command(['git', 'push', 'origin', tag_name])
print(f"✅ Tag pushed to origin")
return True
except subprocess.CalledProcessError as e:
print(f"⚠️ Could not push tag to origin: {e}")
print(f"You can push it manually with: git push origin {tag_name}")
return True # Tag created successfully, push can be done manually
except subprocess.CalledProcessError as e:
print(f"❌ Failed to create tag: {e}")
return False
def validate_release_state(self, force: bool = False) -> tuple[bool, List[str]]:
"""Validate that repository is ready for release.
Args:
force: Skip validation checks if True
Returns:
Tuple of (is_valid, list_of_issues)
"""
issues = []
status = self.get_repository_status()
if not status['is_repo']:
issues.append("Not in a git repository")
else:
if status['has_changes'] and not force:
issues.append("Repository has uncommitted changes")
if status['branch'] != 'main' and not force:
issues.append(f"Not on main branch (currently on {status['branch']})")
return len(issues) == 0, issues
def get_commits_since_tag(self, tag_name: Optional[str] = None) -> List[str]:
"""Get list of commits since specified tag.
Args:
tag_name: Tag to compare against. If None, uses latest tag.
Returns:
List of commit messages since tag
"""
try:
if tag_name is None:
# Get latest tag
tag_result = self._run_command(['git', 'describe', '--tags', '--abbrev=0'])
tag_name = tag_result.stdout.strip()
# Get commits since tag
log_result = self._run_command([
'git', 'log', f'{tag_name}..HEAD', '--oneline', '--no-merges'
])
commits = []
for line in log_result.stdout.strip().split('\n'):
if line:
commits.append(line)
return commits
except subprocess.CalledProcessError:
return []
def tag_exists(self, tag_name: str) -> bool:
"""Check if a git tag exists.
Args:
tag_name: Tag name to check
Returns:
True if tag exists, False otherwise
"""
try:
self._run_command(['git', 'rev-parse', f'refs/tags/{tag_name}'])
return True
except subprocess.CalledProcessError:
return False
def get_remote_url(self, remote: str = 'origin') -> Optional[str]:
"""Get the URL of a git remote.
Args:
remote: Remote name (default: 'origin')
Returns:
Remote URL or None if not found
"""
try:
result = self._run_command(['git', 'remote', 'get-url', remote])
return result.stdout.strip()
except subprocess.CalledProcessError:
return None
def _run_command(self, cmd: List[str]) -> subprocess.CompletedProcess:
"""Run a git command.
Args:
cmd: Command to execute
Returns:
CompletedProcess result
Raises:
subprocess.CalledProcessError: If command fails
"""
if self.dry_run and not any(read_only in cmd for read_only in
['status', 'branch', 'rev-parse', 'describe',
'log', 'remote']):
print(f"[DRY RUN] Would run: {' '.join(cmd)}")
return subprocess.CompletedProcess(cmd, 0, "", "")
return subprocess.run(
cmd,
capture_output=True,
text=True,
check=True,
cwd=self.project_root
)

View File

@@ -0,0 +1,23 @@
"""
Package registry implementations.
This module provides registry clients for publishing packages to various platforms:
- Gitea package registries
- PyPI and Test PyPI
- Extensible factory for custom registries
"""
from .factory import RegistryFactory
from .base import RegistryInterface, RegistryConfig
from .gitea.registry import GiteaRegistry
from .gitea.config import GiteaConfig
from .gitea.exceptions import GiteaError
__all__ = [
"RegistryFactory",
"RegistryInterface",
"RegistryConfig",
"GiteaRegistry",
"GiteaConfig",
"GiteaError",
]

View File

@@ -0,0 +1,101 @@
"""
Base registry interface and configuration.
This module defines the common interface that all registry implementations must follow.
"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
@dataclass
class RegistryConfig:
"""Base configuration for package registries."""
name: str
type: str
url: str
auth_token_env: Optional[str] = None
def get_auth_token(self) -> Optional[str]:
"""Get authentication token from environment variable."""
if self.auth_token_env:
import os
return os.getenv(self.auth_token_env)
return None
class RegistryInterface(ABC):
"""Abstract interface for package registries."""
def __init__(self, config: RegistryConfig):
"""Initialize the registry with configuration."""
self.config = config
@abstractmethod
def upload_package(self, package_path: Path, dry_run: bool = False) -> bool:
"""Upload a package to the registry.
Args:
package_path: Path to package file (.whl or .tar.gz)
dry_run: If True, show what would be done without uploading
Returns:
True if upload successful, False otherwise
"""
pass
@abstractmethod
def check_auth(self) -> bool:
"""Check if authentication is properly configured.
Returns:
True if authenticated, False otherwise
"""
pass
@abstractmethod
def list_packages(self) -> List[Dict[str, Any]]:
"""List packages in the registry.
Returns:
List of package information dictionaries
"""
pass
@abstractmethod
def get_package_info(self, package_name: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific package.
Args:
package_name: Name of the package
Returns:
Package information dictionary or None if not found
"""
pass
@abstractmethod
def delete_package_version(self, package_name: str, version: str,
dry_run: bool = False) -> bool:
"""Delete a specific version of a package.
Args:
package_name: Name of the package
version: Version to delete
dry_run: If True, show what would be done without deleting
Returns:
True if deletion successful, False otherwise
"""
pass
@abstractmethod
def get_registry_info(self) -> Dict[str, Any]:
"""Get information about the registry configuration.
Returns:
Dictionary with registry information
"""
pass

View File

@@ -0,0 +1,159 @@
"""
Registry factory for creating registry clients.
This module provides a factory for creating appropriate registry clients
based on configuration or registry type.
"""
from typing import Dict, Type, Optional, Any
from pathlib import Path
from .base import RegistryInterface, RegistryConfig
from .gitea.registry import GiteaRegistry
from .gitea.config import GiteaConfig
class RegistryFactory:
"""Factory for creating registry clients."""
_registry_types: Dict[str, Type[RegistryInterface]] = {
'gitea': GiteaRegistry,
}
@classmethod
def create(cls, registry_type: str, config: Optional[Dict[str, Any]] = None) -> RegistryInterface:
"""Create a registry client of the specified type.
Args:
registry_type: Type of registry ('gitea', 'pypi', etc.)
config: Optional configuration dictionary
Returns:
Registry client instance
Raises:
ValueError: If registry type is not supported
"""
if registry_type not in cls._registry_types:
raise ValueError(f"Unsupported registry type: {registry_type}. "
f"Supported types: {list(cls._registry_types.keys())}")
registry_class = cls._registry_types[registry_type]
# Handle Gitea-specific configuration
if registry_type == 'gitea':
if config:
gitea_config = GiteaConfig(
gitea_url=config.get('url', ''),
repo_owner=config.get('owner', ''),
repo_name=config.get('repo', ''),
auth_token=config.get('auth_token')
)
return registry_class(gitea_config)
else:
# Auto-detect from git repository
return registry_class()
# For other registry types, create with generic config
if config:
registry_config = RegistryConfig(
name=config.get('name', registry_type),
type=registry_type,
url=config['url'],
auth_token_env=config.get('auth_token_env')
)
return registry_class(registry_config)
else:
raise ValueError(f"Configuration required for {registry_type} registry")
@classmethod
def create_from_pyproject_config(cls, pyproject_path: Optional[Path] = None,
registry_name: str = 'gitea') -> RegistryInterface:
"""Create a registry client from pyproject.toml configuration.
Args:
pyproject_path: Path to pyproject.toml file. If None, looks in current directory.
registry_name: Name of registry configuration to use
Returns:
Registry client instance
Raises:
ValueError: If configuration is invalid or registry not found
"""
if pyproject_path is None:
pyproject_path = Path.cwd() / "pyproject.toml"
if not pyproject_path.exists():
raise ValueError(f"pyproject.toml not found at {pyproject_path}")
try:
import tomllib
except ImportError:
try:
import tomli as tomllib # Fallback for Python < 3.11
except ImportError:
raise ImportError("tomllib or tomli required to read pyproject.toml")
with open(pyproject_path, 'rb') as f:
config = tomllib.load(f)
# Look for release-management configuration
release_config = config.get('tool', {}).get('release-management', {})
registries_config = release_config.get('registries', {})
if registry_name not in registries_config:
raise ValueError(f"Registry '{registry_name}' not found in pyproject.toml configuration")
registry_config = registries_config[registry_name]
registry_type = registry_config.get('type', registry_name)
# Add auth token from environment if specified
auth_token_env = registry_config.get('auth_token_env')
if auth_token_env:
import os
registry_config = registry_config.copy()
registry_config['auth_token'] = os.getenv(auth_token_env)
return cls.create(registry_type, registry_config)
@classmethod
def auto_detect(cls) -> RegistryInterface:
"""Auto-detect registry type from current environment.
Currently only supports Gitea auto-detection from git repository.
Returns:
Registry client instance
Raises:
ValueError: If no registry can be auto-detected
"""
# Try Gitea auto-detection first
try:
return cls.create('gitea')
except Exception:
pass
raise ValueError("Could not auto-detect registry type. "
"Ensure you're in a git repository with Gitea remote, "
"or provide explicit configuration.")
@classmethod
def register_registry_type(cls, registry_type: str, registry_class: Type[RegistryInterface]) -> None:
"""Register a new registry type.
Args:
registry_type: String identifier for the registry type
registry_class: Registry class that implements RegistryInterface
"""
cls._registry_types[registry_type] = registry_class
@classmethod
def list_supported_types(cls) -> list[str]:
"""List all supported registry types.
Returns:
List of supported registry type strings
"""
return list(cls._registry_types.keys())

View File

@@ -0,0 +1,14 @@
"""
Gitea package registry implementation.
This module provides Gitea-specific registry functionality including:
- Package registry uploads
- Release asset fallback
- Configuration and authentication
"""
from .registry import GiteaRegistry
from .config import GiteaConfig
from .exceptions import GiteaError, GiteaConfigError
__all__ = ["GiteaRegistry", "GiteaConfig", "GiteaError", "GiteaConfigError"]

View File

@@ -0,0 +1,175 @@
"""
Gitea-specific configuration management.
"""
import os
import subprocess
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlparse
from .exceptions import GiteaConfigError
def load_dotenv_file(env_file: Path) -> None:
"""Load environment variables from a .env file."""
if not env_file.exists():
return
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ.setdefault(key.strip(), value.strip())
@dataclass
class GiteaConfig:
"""Configuration for Gitea API access."""
# Repository settings (required)
gitea_url: str = ""
repo_owner: str = ""
repo_name: str = ""
# Authentication (optional for read operations)
auth_token: Optional[str] = None
@property
def base_api_url(self) -> str:
"""Get the base API URL for this repository."""
return f"{self.gitea_url}/api/v1"
@property
def repo_api_url(self) -> str:
"""Get the repository API URL."""
return f"{self.base_api_url}/repos/{self.repo_owner}/{self.repo_name}"
@property
def issues_api_url(self) -> str:
"""Get the issues API URL."""
return f"{self.repo_api_url}/issues"
@property
def milestones_api_url(self) -> str:
"""Get the milestones API URL."""
return f"{self.repo_api_url}/milestones"
@property
def labels_api_url(self) -> str:
"""Get the labels API URL."""
return f"{self.repo_api_url}/labels"
@classmethod
def from_environment(cls, env_prefix: str = "GITEA") -> "GiteaConfig":
"""Create config from environment variables.
Args:
env_prefix: Environment variable prefix (default: GITEA)
Looks for {prefix}_URL, {prefix}_REPO_OWNER, etc.
"""
# Auto-load .env.gitea file if it exists
env_file = Path(".env.gitea")
load_dotenv_file(env_file)
config = cls()
# Load from environment
config.gitea_url = os.getenv(f"{env_prefix}_URL", "")
config.repo_owner = os.getenv(f"{env_prefix}_REPO_OWNER", "")
config.repo_name = os.getenv(f"{env_prefix}_REPO_NAME", "")
config.auth_token = os.getenv(f"{env_prefix}_API_TOKEN")
return config
@classmethod
def from_git_repository(cls) -> "GiteaConfig":
"""Create config by auto-detecting from current git repository.
Only requires GITEA_API_TOKEN environment variable.
All other settings are detected from git remote origin.
"""
try:
# Get git remote origin URL
result = subprocess.run(
['git', 'remote', 'get-url', 'origin'],
capture_output=True,
text=True,
check=True
)
origin_url = result.stdout.strip()
# Parse different URL formats
if origin_url.startswith('http://') or origin_url.startswith('https://'):
# HTTP(S) format: https://gitea.example.com/owner/repo.git
parsed = urlparse(origin_url)
gitea_url = f"{parsed.scheme}://{parsed.netloc}"
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) >= 2:
repo_owner = path_parts[0]
repo_name = path_parts[1].replace('.git', '')
else:
raise GiteaConfigError(f"Cannot parse repository path from URL: {origin_url}")
elif '@' in origin_url:
# SSH format: git@gitea.example.com:owner/repo.git
if ':' in origin_url:
host_part, path_part = origin_url.split(':', 1)
host = host_part.split('@')[-1]
gitea_url = f"https://{host}" # Assume HTTPS for API
path_parts = path_part.strip('/').split('/')
if len(path_parts) >= 2:
repo_owner = path_parts[0]
repo_name = path_parts[1].replace('.git', '')
else:
raise GiteaConfigError(f"Cannot parse repository path from SSH URL: {origin_url}")
else:
raise GiteaConfigError(f"Cannot parse SSH URL format: {origin_url}")
else:
raise GiteaConfigError(f"Unsupported git remote URL format: {origin_url}")
# Get auth token from environment
auth_token = os.getenv('GITEA_API_TOKEN')
return cls(
gitea_url=gitea_url,
repo_owner=repo_owner,
repo_name=repo_name,
auth_token=auth_token
)
except subprocess.CalledProcessError as e:
raise GiteaConfigError(f"Failed to get git remote origin: {e}")
except Exception as e:
raise GiteaConfigError(f"Failed to auto-detect git repository config: {e}")
@classmethod
def from_tddai_config(cls, tddai_config) -> "GiteaConfig":
"""Create GiteaConfig from legacy TddaiConfig for backwards compatibility."""
return cls(
gitea_url=tddai_config.gitea_url,
repo_owner=tddai_config.repo_owner,
repo_name=tddai_config.repo_name,
auth_token=os.getenv('GITEA_API_TOKEN')
)
def validate(self) -> None:
"""Validate configuration settings."""
if not self.gitea_url:
raise GiteaConfigError("gitea_url cannot be empty")
if not self.repo_owner:
raise GiteaConfigError("repo_owner cannot be empty")
if not self.repo_name:
raise GiteaConfigError("repo_name cannot be empty")
# Validate URL format
if not (self.gitea_url.startswith('http://') or self.gitea_url.startswith('https://')):
raise GiteaConfigError("gitea_url must start with http:// or https://")
def requires_auth(self, operation: str = "read") -> bool:
"""Check if operation requires authentication."""
write_operations = {"create", "update", "delete", "write"}
return operation in write_operations and not self.auth_token

View File

@@ -0,0 +1,23 @@
"""
Gitea-specific exceptions.
"""
class GiteaError(Exception):
"""Base class for Gitea-related errors."""
pass
class GiteaConfigError(GiteaError):
"""Configuration-related errors."""
pass
class GiteaApiError(GiteaError):
"""API-related errors."""
pass
class GiteaAuthError(GiteaError):
"""Authentication-related errors."""
pass

View File

@@ -0,0 +1,355 @@
"""
Gitea Package Registry Client
This module provides functionality to publish Python packages to Gitea's package registry.
Gitea supports multiple package registries including PyPI-compatible registries.
"""
import os
from pathlib import Path
from typing import Optional, List, Dict, Any
from ..base import RegistryInterface
from .config import GiteaConfig
from .exceptions import GiteaError
class GiteaRegistry(RegistryInterface):
"""Client for publishing packages to Gitea package registry."""
def __init__(self, config: Optional[GiteaConfig] = None):
"""Initialize the package registry client.
Args:
config: Gitea configuration. If None, auto-detects from git repository.
"""
self.config = config or GiteaConfig.from_git_repository()
self.config.validate()
@property
def pypi_registry_url(self) -> str:
"""Get the PyPI-compatible registry URL for this repository."""
return f"{self.config.gitea_url}/api/packages/{self.config.repo_owner}/pypi"
@property
def package_list_url(self) -> str:
"""Get the package listing URL for this repository."""
return f"{self.config.gitea_url}/api/v1/packages/{self.config.repo_owner}"
def check_auth(self) -> bool:
"""Check if authentication token is available and valid."""
if not self.config.auth_token:
return False
try:
# Test auth by trying to access packages API
import requests
headers = {"Authorization": f"token {self.config.auth_token}"}
response = requests.get(self.package_list_url, headers=headers, timeout=10)
return response.status_code in [200, 404] # 404 is okay if no packages exist yet
except Exception:
return False
def list_packages(self) -> List[Dict[str, Any]]:
"""List all packages for this repository owner.
Returns:
List of package information dictionaries
"""
try:
import requests
headers = {}
if self.config.auth_token:
headers["Authorization"] = f"token {self.config.auth_token}"
response = requests.get(self.package_list_url, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
raise GiteaError(f"Failed to list packages: {e}")
def get_package_info(self, package_name: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific package.
Args:
package_name: Name of the package
Returns:
Package information dictionary or None if not found
"""
try:
packages = self.list_packages()
for package in packages:
if package.get("name") == package_name:
return package
return None
except Exception:
return None
def upload_package(self, package_path: Path, dry_run: bool = False) -> bool:
"""Upload a package to Gitea registry.
Args:
package_path: Path to package file (.whl or .tar.gz)
dry_run: If True, show what would be done without uploading
Returns:
True if upload successful, False otherwise
"""
if not self.config.auth_token:
raise GiteaError("Authentication token required for package upload. Set GITEA_API_TOKEN environment variable.")
if not package_path.exists():
raise GiteaError(f"Package file not found: {package_path}")
if dry_run:
print(f"[DRY RUN] Would upload to: {self.pypi_registry_url}")
print(f"[DRY RUN] Would upload: {package_path}")
return True
return self._upload_file(package_path)
def upload_package_as_release_assets(self,
version: str,
wheel_path: Path,
sdist_path: Optional[Path] = None,
dry_run: bool = False) -> bool:
"""Upload packages as Gitea release assets (fallback when package registry unavailable).
Args:
version: Version tag (e.g., "v0.8.0")
wheel_path: Path to wheel (.whl) file
sdist_path: Optional path to source distribution (.tar.gz) file
dry_run: If True, show what would be done without uploading
Returns:
True if upload successful, False otherwise
"""
if not self.config.auth_token:
raise GiteaError("Authentication token required for release upload. Set GITEA_API_TOKEN environment variable.")
if not wheel_path.exists():
raise GiteaError(f"Wheel file not found: {wheel_path}")
if sdist_path and not sdist_path.exists():
raise GiteaError(f"Source distribution file not found: {sdist_path}")
files_to_upload = [wheel_path]
if sdist_path:
files_to_upload.append(sdist_path)
if dry_run:
print(f"[DRY RUN] Would upload release assets for {version}")
print(f"[DRY RUN] Release API: {self.config.repo_api_url}/releases")
for file_path in files_to_upload:
print(f"[DRY RUN] Would upload: {file_path}")
return True
# Create or get release
release_id = self._create_or_get_release(version)
if not release_id:
return False
# Upload each file as release asset
success = True
for file_path in files_to_upload:
if not self._upload_release_asset(release_id, file_path):
success = False
return success
def delete_package_version(self, package_name: str, version: str,
dry_run: bool = False) -> bool:
"""Delete a specific version of a package.
Args:
package_name: Name of the package
version: Version to delete
dry_run: If True, show what would be done without deleting
Returns:
True if deletion successful, False otherwise
"""
if not self.config.auth_token:
raise GiteaError("Authentication token required for package deletion.")
delete_url = f"{self.config.gitea_url}/api/v1/packages/{self.config.repo_owner}/pypi/{package_name}/{version}"
if dry_run:
print(f"[DRY RUN] Would delete: {package_name} v{version}")
print(f"[DRY RUN] DELETE {delete_url}")
return True
try:
import requests
headers = {"Authorization": f"token {self.config.auth_token}"}
response = requests.delete(delete_url, headers=headers, timeout=10)
if response.status_code in [200, 204, 404]: # 404 = already deleted
print(f"✅ Deleted: {package_name} v{version}")
return True
else:
print(f"❌ Delete failed: {response.status_code} {response.text}")
return False
except Exception as e:
print(f"❌ Delete failed: {e}")
return False
def get_registry_info(self) -> Dict[str, Any]:
"""Get information about the package registry configuration.
Returns:
Dictionary with registry information
"""
return {
"gitea_url": self.config.gitea_url,
"repo_owner": self.config.repo_owner,
"repo_name": self.config.repo_name,
"pypi_registry_url": self.pypi_registry_url,
"package_list_url": self.package_list_url,
"auth_configured": bool(self.config.auth_token),
"auth_valid": self.check_auth() if self.config.auth_token else False
}
def _upload_file(self, file_path: Path) -> bool:
"""Upload a single file to the registry.
Args:
file_path: Path to file to upload
Returns:
True if upload successful, False otherwise
"""
try:
import requests
# Gitea PyPI upload API expects PUT with the file content as body
# URL format: /api/packages/{owner}/pypi/{filename}
upload_url = f"{self.config.gitea_url}/api/packages/{self.config.repo_owner}/pypi"
with open(file_path, 'rb') as f:
file_content = f.read()
headers = {
'Authorization': f'token {self.config.auth_token}',
'Content-Type': 'application/octet-stream'
}
# Upload using PUT request with filename in URL
upload_endpoint = f"{upload_url}/{file_path.name}"
response = requests.put(
upload_endpoint,
headers=headers,
data=file_content,
timeout=60
)
if response.status_code in [200, 201, 409]: # 409 = already exists
print(f"✅ Uploaded: {file_path.name}")
if response.status_code == 409:
print(f" (already exists)")
return True
else:
print(f"❌ Upload failed for {file_path.name}: {response.status_code}")
if response.text:
print(f" Error: {response.text}")
return False
except Exception as e:
print(f"❌ Upload failed for {file_path.name}: {e}")
return False
def _create_or_get_release(self, version: str) -> Optional[int]:
"""Create a new release or get existing release ID.
Args:
version: Version tag (e.g., "v0.8.0")
Returns:
Release ID if successful, None otherwise
"""
try:
import requests
# Ensure version has 'v' prefix
tag_name = version if version.startswith('v') else f'v{version}'
headers = {"Authorization": f"token {self.config.auth_token}"}
# First, try to get existing release
releases_url = f"{self.config.repo_api_url}/releases"
response = requests.get(releases_url, headers=headers, timeout=10)
if response.status_code == 200:
releases = response.json()
for release in releases:
if release.get('tag_name') == tag_name:
print(f"✅ Found existing release: {tag_name}")
return release['id']
# Create new release
release_data = {
"tag_name": tag_name,
"name": f"MarkiTect {version.lstrip('v')}",
"body": f"Release {version.lstrip('v')}\\n\\nPython packages for MarkiTect.",
"draft": False,
"prerelease": False
}
response = requests.post(releases_url, headers=headers, json=release_data, timeout=10)
if response.status_code == 201:
release = response.json()
print(f"✅ Created release: {tag_name}")
return release['id']
else:
print(f"❌ Failed to create release: {response.status_code} {response.text}")
return None
except Exception as e:
print(f"❌ Error managing release: {e}")
return None
def _upload_release_asset(self, release_id: int, file_path: Path) -> bool:
"""Upload a file as a release asset.
Args:
release_id: Gitea release ID
file_path: Path to file to upload
Returns:
True if upload successful, False otherwise
"""
try:
import requests
# Upload asset to Gitea release
upload_url = f"{self.config.repo_api_url}/releases/{release_id}/assets"
headers = {
"Authorization": f"token {self.config.auth_token}"
}
with open(file_path, 'rb') as f:
files = {
'attachment': (file_path.name, f, 'application/octet-stream')
}
response = requests.post(
upload_url,
headers=headers,
files=files,
timeout=120 # Larger timeout for file uploads
)
if response.status_code == 201:
print(f"✅ Uploaded release asset: {file_path.name}")
return True
else:
print(f"❌ Failed to upload {file_path.name}: {response.status_code} {response.text}")
return False
except Exception as e:
print(f"❌ Upload failed for {file_path.name}: {e}")
return False

View File

@@ -0,0 +1,11 @@
"""
Utilities for release management.
This module provides utility functions for version management,
validation, and other common operations.
"""
from .version import VersionManager
from .validation import ReleaseValidator
__all__ = ["VersionManager", "ReleaseValidator"]

View File

@@ -0,0 +1,230 @@
"""
Release validation utilities.
This module provides validation functions for release readiness.
"""
from pathlib import Path
from typing import List, Tuple, Optional
from ..git.manager import GitManager
class ReleaseValidator:
"""Validates release readiness and requirements."""
def __init__(self, project_root: Optional[Path] = None):
"""Initialize release validator.
Args:
project_root: Root directory of the project
"""
self.project_root = project_root or Path.cwd()
self.git_manager = GitManager(project_root)
def validate_release_state(self, force: bool = False) -> Tuple[bool, List[str]]:
"""Validate that repository is ready for release.
Args:
force: Skip validation checks if True
Returns:
Tuple of (is_valid, list_of_issues)
"""
if force:
return True, []
issues = []
# Git repository validation
git_issues = self._validate_git_state()
issues.extend(git_issues)
# Project structure validation
structure_issues = self._validate_project_structure()
issues.extend(structure_issues)
# Configuration validation
config_issues = self._validate_configuration()
issues.extend(config_issues)
return len(issues) == 0, issues
def _validate_git_state(self) -> List[str]:
"""Validate git repository state.
Returns:
List of git-related issues
"""
issues = []
status = self.git_manager.get_repository_status()
if not status['is_repo']:
issues.append("Not in a git repository")
return issues
if status['has_changes']:
issues.append("Repository has uncommitted changes")
if status['branch'] != 'main':
issues.append(f"Not on main branch (currently on {status['branch']})")
# Check if remote exists
remote_url = self.git_manager.get_remote_url()
if not remote_url:
issues.append("No git remote 'origin' configured")
return issues
def _validate_project_structure(self) -> List[str]:
"""Validate project structure for releases.
Returns:
List of project structure issues
"""
issues = []
# Check for required files
required_files = ['pyproject.toml']
for file_name in required_files:
file_path = self.project_root / file_name
if not file_path.exists():
issues.append(f"Missing required file: {file_name}")
# Check for setuptools-scm configuration
pyproject_path = self.project_root / 'pyproject.toml'
if pyproject_path.exists():
try:
import tomllib
except ImportError:
try:
import tomli as tomllib
except ImportError:
issues.append("Cannot read pyproject.toml (tomllib/tomli not available)")
return issues
try:
with open(pyproject_path, 'rb') as f:
config = tomllib.load(f)
# Check for setuptools-scm configuration
build_system = config.get('build-system', {})
if 'setuptools-scm' not in str(build_system.get('requires', [])):
issues.append("setuptools-scm not found in build-system.requires")
# Check for dynamic version
project_config = config.get('project', {})
if 'version' in project_config:
issues.append("Static version found in project config. Use dynamic versioning with setuptools-scm.")
dynamic = project_config.get('dynamic', [])
if 'version' not in dynamic:
issues.append("'version' not in project.dynamic. Add it for setuptools-scm.")
except Exception as e:
issues.append(f"Error reading pyproject.toml: {e}")
return issues
def _validate_configuration(self) -> List[str]:
"""Validate release configuration.
Returns:
List of configuration issues
"""
issues = []
# Check for environment variables that might be needed
import os
# Check for common auth tokens (warn, don't fail)
auth_vars = ['GITEA_API_TOKEN', 'PYPI_TOKEN', 'GITHUB_TOKEN']
available_auth = [var for var in auth_vars if os.getenv(var)]
if not available_auth:
issues.append("No authentication tokens found in environment. "
"Consider setting GITEA_API_TOKEN, PYPI_TOKEN, or GITHUB_TOKEN "
"for package publishing.")
return issues
def validate_version_string(self, version_string: str) -> Tuple[bool, List[str]]:
"""Validate a version string for release.
Args:
version_string: Version string to validate
Returns:
Tuple of (is_valid, list_of_issues)
"""
issues = []
if not version_string:
issues.append("Version string cannot be empty")
return False, issues
# Check basic format
if not version_string.replace('.', '').replace('-', '').replace('+', '').replace('a', '').replace('b', '').replace('rc', '').isalnum():
issues.append("Version string contains invalid characters")
# Check for development markers in release
dev_markers = ['dev', '.dev', '+dev']
if any(marker in version_string.lower() for marker in dev_markers):
issues.append("Development versions should not be released")
# Check for reasonable version format (semantic versioning)
try:
from packaging import version
version.Version(version_string)
except Exception:
issues.append("Version string is not valid according to PEP 440")
# Check if version already exists as git tag
tag_name = version_string if version_string.startswith('v') else f'v{version_string}'
if self.git_manager.tag_exists(tag_name):
issues.append(f"Git tag {tag_name} already exists")
return len(issues) == 0, issues
def get_validation_summary(self) -> dict:
"""Get a comprehensive validation summary.
Returns:
Dictionary with validation results
"""
is_valid, issues = self.validate_release_state()
return {
'is_valid': is_valid,
'issues': issues,
'git_status': self.git_manager.get_repository_status(),
'recommendations': self._get_recommendations(issues)
}
def _get_recommendations(self, issues: List[str]) -> List[str]:
"""Get recommendations based on validation issues.
Args:
issues: List of validation issues
Returns:
List of recommendations
"""
recommendations = []
if any('uncommitted changes' in issue for issue in issues):
recommendations.append("Commit or stash your changes before releasing")
if any('not on main branch' in issue for issue in issues):
recommendations.append("Switch to main branch: git checkout main")
if any('setuptools-scm' in issue for issue in issues):
recommendations.append("Configure setuptools-scm in pyproject.toml")
if any('authentication' in issue.lower() for issue in issues):
recommendations.append("Set up authentication tokens for package publishing")
if not issues:
recommendations.append("Repository is ready for release!")
return recommendations

View File

@@ -0,0 +1,191 @@
"""
Version management utilities.
This module provides utilities for working with versions and setuptools-scm.
"""
import subprocess
from pathlib import Path
from typing import Optional, Dict, Any
from packaging import version
class VersionManager:
"""Utilities for version management with setuptools-scm."""
def __init__(self, project_root: Optional[Path] = None):
"""Initialize version manager.
Args:
project_root: Root directory of the project
"""
self.project_root = project_root or Path.cwd()
def get_current_version(self) -> str:
"""Get current version using setuptools-scm.
Returns:
Current version string or "unknown" if unavailable
"""
try:
result = subprocess.run(
['python', '-m', 'setuptools_scm'],
capture_output=True,
text=True,
check=True,
cwd=self.project_root
)
return result.stdout.strip()
except subprocess.CalledProcessError:
return "unknown"
def parse_version(self, version_string: str) -> Dict[str, Any]:
"""Parse a version string and return components.
Args:
version_string: Version string to parse
Returns:
Dictionary with version components
"""
try:
v = version.Version(version_string)
return {
'major': v.major,
'minor': v.minor,
'micro': v.micro,
'is_prerelease': v.is_prerelease,
'is_devrelease': v.is_devrelease,
'local': v.local,
'public': v.public,
'base_version': v.base_version,
}
except version.InvalidVersion:
return {'error': f'Invalid version: {version_string}'}
def is_development_version(self, version_string: Optional[str] = None) -> bool:
"""Check if version is a development version.
Args:
version_string: Version to check. If None, uses current version.
Returns:
True if development version, False otherwise
"""
if version_string is None:
version_string = self.get_current_version()
try:
v = version.Version(version_string)
return v.is_devrelease or 'dev' in version_string.lower()
except version.InvalidVersion:
return True # Assume unknown versions are dev
def compare_versions(self, version1: str, version2: str) -> int:
"""Compare two version strings.
Args:
version1: First version to compare
version2: Second version to compare
Returns:
-1 if version1 < version2, 0 if equal, 1 if version1 > version2
"""
try:
v1 = version.Version(version1)
v2 = version.Version(version2)
if v1 < v2:
return -1
elif v1 > v2:
return 1
else:
return 0
except version.InvalidVersion:
# Fallback to string comparison
if version1 < version2:
return -1
elif version1 > version2:
return 1
else:
return 0
def get_next_version(self, current_version: str, bump_type: str = 'patch') -> str:
"""Get the next version based on bump type.
Args:
current_version: Current version string
bump_type: Type of bump ('major', 'minor', 'patch')
Returns:
Next version string
Raises:
ValueError: If bump_type is invalid
"""
try:
v = version.Version(current_version)
major, minor, micro = v.major, v.minor, v.micro
if bump_type == 'major':
return f"{major + 1}.0.0"
elif bump_type == 'minor':
return f"{major}.{minor + 1}.0"
elif bump_type == 'patch':
return f"{major}.{minor}.{micro + 1}"
else:
raise ValueError(f"Invalid bump type: {bump_type}")
except version.InvalidVersion:
raise ValueError(f"Cannot parse version: {current_version}")
def suggest_version(self, current_version: Optional[str] = None) -> Dict[str, str]:
"""Suggest next version options.
Args:
current_version: Current version. If None, gets from setuptools-scm.
Returns:
Dictionary with version suggestions
"""
if current_version is None:
current_version = self.get_current_version()
if current_version == "unknown":
return {
'error': 'Cannot determine current version',
'suggestion': 'Consider creating an initial tag like v0.1.0'
}
try:
# Strip development version info to get base
v = version.Version(current_version)
base_version = v.base_version
return {
'current': current_version,
'base': base_version,
'patch': self.get_next_version(base_version, 'patch'),
'minor': self.get_next_version(base_version, 'minor'),
'major': self.get_next_version(base_version, 'major'),
}
except Exception as e:
return {
'error': str(e),
'current': current_version
}
def validate_version_format(self, version_string: str) -> bool:
"""Validate if a version string follows semantic versioning.
Args:
version_string: Version string to validate
Returns:
True if valid semantic version, False otherwise
"""
try:
version.Version(version_string)
return True
except version.InvalidVersion:
return False