Refactor operations.py into smaller helpers

- Add PreflightResult NamedTuple with .ok property
- Extract _run_compose_step to handle raw output and interrupts
- Extract _up_single_service for single-host migration logic
- Deduplicate pull/build handling in _migrate_service
- Add was_running check to only rollback if service was running
This commit is contained in:
Bas Nijholt
2025-12-16 17:08:25 -08:00
parent 34642e8b8e
commit affed2edcf

View File

@@ -6,7 +6,7 @@ CLI commands are thin wrappers around these functions.
from __future__ import annotations
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, NamedTuple
from .compose import parse_devices, parse_external_networks, parse_host_volumes
from .console import console, err_console
@@ -14,6 +14,7 @@ from .executor import (
CommandResult,
check_networks_exist,
check_paths_exist,
check_service_running,
run_command,
run_compose,
run_compose_on_host,
@@ -28,10 +29,37 @@ class OperationInterruptedError(Exception):
"""Raised when a command is interrupted by Ctrl+C."""
def _check_interrupted(result: CommandResult) -> None:
"""Raise if command was interrupted."""
class PreflightResult(NamedTuple):
"""Result of pre-flight checks for a service on a host."""
missing_paths: list[str]
missing_networks: list[str]
missing_devices: list[str]
@property
def ok(self) -> bool:
"""Return True if all checks passed."""
return not (self.missing_paths or self.missing_networks or self.missing_devices)
async def _run_compose_step(
cfg: Config,
service: str,
command: str,
*,
raw: bool,
host: str | None = None,
) -> CommandResult:
"""Run a compose command, handle raw output newline, and check for interrupts."""
if host:
result = await run_compose_on_host(cfg, service, host, command, raw=raw)
else:
result = await run_compose(cfg, service, command, raw=raw)
if raw:
print() # Ensure newline after raw output
if result.interrupted:
raise OperationInterruptedError
return result
def get_service_paths(cfg: Config, service: str) -> list[str]:
@@ -45,12 +73,10 @@ async def check_service_requirements(
cfg: Config,
service: str,
host_name: str,
) -> tuple[list[str], list[str], list[str]]:
) -> PreflightResult:
"""Check if a service can run on a specific host.
Verifies that all required paths (volumes), networks, and devices exist.
Returns (missing_paths, missing_networks, missing_devices).
"""
# Check mount paths
paths = get_service_paths(cfg, service)
@@ -71,7 +97,7 @@ async def check_service_requirements(
dev_exists = await check_paths_exist(cfg, host_name, devices)
missing_devices = [d for d, found in dev_exists.items() if not found]
return missing_paths, missing_networks, missing_devices
return PreflightResult(missing_paths, missing_networks, missing_devices)
async def _cleanup_and_rollback(
@@ -81,14 +107,21 @@ async def _cleanup_and_rollback(
current_host: str,
prefix: str,
*,
was_running: bool,
raw: bool = False,
) -> None:
"""Clean up failed start and attempt rollback to old host."""
"""Clean up failed start and attempt rollback to old host if it was running."""
err_console.print(
f"{prefix} [yellow]![/] Cleaning up failed start on [magenta]{target_host}[/]"
)
await run_compose(cfg, service, "down", raw=raw)
if not was_running:
err_console.print(
f"{prefix} [dim]Service was not running on [magenta]{current_host}[/], skipping rollback[/]"
)
return
err_console.print(f"{prefix} [yellow]![/] Rolling back to [magenta]{current_host}[/]...")
rollback_result = await run_compose_on_host(cfg, service, current_host, "up -d", raw=raw)
if rollback_result.success:
@@ -100,21 +133,19 @@ async def _cleanup_and_rollback(
def _report_preflight_failures(
service: str,
target_host: str,
missing_paths: list[str],
missing_networks: list[str],
missing_devices: list[str],
preflight: PreflightResult,
) -> None:
"""Report pre-flight check failures."""
err_console.print(
f"[cyan]\\[{service}][/] [red]✗[/] Cannot start on [magenta]{target_host}[/]:"
)
for path in missing_paths:
for path in preflight.missing_paths:
err_console.print(f" [red]✗[/] missing path: {path}")
for net in missing_networks:
for net in preflight.missing_networks:
err_console.print(f" [red]✗[/] missing network: {net}")
if missing_networks:
if preflight.missing_networks:
err_console.print(f" [dim]hint: cf init-network {target_host}[/]")
for dev in missing_devices:
for dev in preflight.missing_devices:
err_console.print(f" [red]✗[/] missing device: {dev}")
@@ -133,13 +164,9 @@ async def _up_multi_host_service(
# Pre-flight checks on all hosts
for host_name in host_names:
missing_paths, missing_networks, missing_devices = await check_service_requirements(
cfg, service, host_name
)
if missing_paths or missing_networks or missing_devices:
_report_preflight_failures(
service, host_name, missing_paths, missing_networks, missing_devices
)
preflight = await check_service_requirements(cfg, service, host_name)
if not preflight.ok:
_report_preflight_failures(service, host_name, preflight)
results.append(
CommandResult(service=f"{service}@{host_name}", exit_code=1, success=False)
)
@@ -184,36 +211,78 @@ async def _migrate_service(
console.print(
f"{prefix} Migrating from [magenta]{current_host}[/] → [magenta]{target_host}[/]..."
)
# Prepare images on target host before stopping old service to minimize downtime.
# Pull handles image-based services; build handles Dockerfile-based services.
# Each command is a no-op for the other type (exit 0, no work done).
pull_result = await run_compose(cfg, service, "pull", raw=raw)
if raw:
print() # Ensure newline after raw output
_check_interrupted(pull_result)
if not pull_result.success:
err_console.print(
f"{prefix} [red]✗[/] Pull failed on [magenta]{target_host}[/], "
"leaving service on current host"
for cmd, label in [("pull", "Pull"), ("build", "Build")]:
result = await _run_compose_step(cfg, service, cmd, raw=raw)
if not result.success:
err_console.print(
f"{prefix} [red]✗[/] {label} failed on [magenta]{target_host}[/], "
"leaving service on current host"
)
return result
# Stop on current host
down_result = await _run_compose_step(cfg, service, "down", raw=raw, host=current_host)
return down_result if not down_result.success else None
async def _up_single_service(
cfg: Config,
service: str,
prefix: str,
*,
raw: bool,
) -> CommandResult:
"""Start a single-host service with migration support."""
target_host = cfg.get_hosts(service)[0]
current_host = get_service_host(cfg, service)
# Pre-flight check: verify paths, networks, and devices exist on target
preflight = await check_service_requirements(cfg, service, target_host)
if not preflight.ok:
_report_preflight_failures(service, target_host, preflight)
return CommandResult(service=service, exit_code=1, success=False)
# If service is deployed elsewhere, migrate it
did_migration = False
was_running = False
if current_host and current_host != target_host:
if current_host in cfg.hosts:
was_running = await check_service_running(cfg, service, current_host)
failure = await _migrate_service(
cfg, service, current_host, target_host, prefix, raw=raw
)
if failure:
return failure
did_migration = True
else:
err_console.print(
f"{prefix} [yellow]![/] was on "
f"[magenta]{current_host}[/] (not in config), skipping down"
)
# Start on target host
console.print(f"{prefix} Starting on [magenta]{target_host}[/]...")
up_result = await _run_compose_step(cfg, service, "up -d", raw=raw)
# Update state on success, or rollback on failure
if up_result.success:
set_service_host(cfg, service, target_host)
elif did_migration and current_host:
await _cleanup_and_rollback(
cfg,
service,
target_host,
current_host,
prefix,
was_running=was_running,
raw=raw,
)
return pull_result
build_result = await run_compose(cfg, service, "build", raw=raw)
if raw:
print() # Ensure newline after raw output
_check_interrupted(build_result)
if not build_result.success:
err_console.print(
f"{prefix} [red]✗[/] Build failed on [magenta]{target_host}[/], "
"leaving service on current host"
)
return build_result
down_result = await run_compose_on_host(cfg, service, current_host, "down", raw=raw)
if raw:
print() # Ensure newline after raw output
_check_interrupted(down_result)
if not down_result.success:
return down_result
return None
return up_result
async def up_services(
@@ -230,60 +299,11 @@ async def up_services(
for idx, service in enumerate(services, 1):
prefix = f"[dim][{idx}/{total}][/] [cyan]\\[{service}][/]"
# Handle multi-host services separately (no migration)
if cfg.is_multi_host(service):
multi_results = await _up_multi_host_service(cfg, service, prefix, raw=raw)
results.extend(multi_results)
continue
target_host = cfg.get_hosts(service)[0]
current_host = get_service_host(cfg, service)
# Pre-flight check: verify paths, networks, and devices exist on target
missing_paths, missing_networks, missing_devices = await check_service_requirements(
cfg, service, target_host
)
if missing_paths or missing_networks or missing_devices:
_report_preflight_failures(
service, target_host, missing_paths, missing_networks, missing_devices
)
results.append(CommandResult(service=service, exit_code=1, success=False))
continue
# If service is deployed elsewhere, migrate it
did_migration = False
if current_host and current_host != target_host:
if current_host in cfg.hosts:
failure = await _migrate_service(
cfg, service, current_host, target_host, prefix, raw=raw
)
if failure:
results.append(failure)
continue
did_migration = True
else:
err_console.print(
f"{prefix} [yellow]![/] was on "
f"[magenta]{current_host}[/] (not in config), skipping down"
)
# Start on target host
console.print(f"{prefix} Starting on [magenta]{target_host}[/]...")
up_result = await run_compose(cfg, service, "up -d", raw=raw)
if raw:
print() # Ensure newline after raw output (progress bars end with \r)
_check_interrupted(up_result)
results.append(up_result)
# Update state on success, or rollback on failure
if up_result.success:
set_service_host(cfg, service, target_host)
elif did_migration and current_host:
await _cleanup_and_rollback(
cfg, service, target_host, current_host, prefix, raw=raw
)
results.extend(await _up_multi_host_service(cfg, service, prefix, raw=raw))
else:
results.append(await _up_single_service(cfg, service, prefix, raw=raw))
except OperationInterruptedError:
# Convert to KeyboardInterrupt to propagate up to CLI
raise KeyboardInterrupt from None
return results
@@ -306,10 +326,10 @@ async def check_host_compatibility(
results: dict[str, tuple[int, int, list[str]]] = {}
for host_name in cfg.hosts:
missing_paths, missing_networks, missing_devices = await check_service_requirements(
cfg, service, host_name
preflight = await check_service_requirements(cfg, service, host_name)
all_missing = (
preflight.missing_paths + preflight.missing_networks + preflight.missing_devices
)
all_missing = missing_paths + missing_networks + missing_devices
found = total - len(all_missing)
results[host_name] = (found, total, all_missing)