refactor: simplify codebase with reduced abstractions (#56)

- Remove dead code: `run_host_operation` in cli/common.py (never called)
- Inline `_report_*` helpers in lifecycle.py (each called once)
- Merge `validate_host` into `validate_hosts` with flexible str|list param
- Merge `_report_no_config_found` and `_report_config_path_not_exists`
  into single `_report_missing_config` function
- Simplify `_get_editor` from 18 lines to 6 using walrus operator
- Extract `COMPOSE_FILENAMES` constant to avoid duplication in config.py
- Extract `_stream_subprocess` helper to reduce duplication in streaming.py

Net reduction: ~130 lines of code with no functionality changes.
This commit is contained in:
Bas Nijholt
2025-12-18 23:45:34 -08:00
committed by GitHub
parent 9f55dcdd6e
commit b4ebe15dd1
5 changed files with 79 additions and 214 deletions

View File

@@ -169,7 +169,7 @@ def get_services(
config = load_config_or_exit(config_path)
if host is not None:
validate_host(config, host)
validate_hosts(config, host)
svc_list = [s for s in config.services if host in config.get_hosts(s)]
if not svc_list:
print_warning(f"No services configured for host [magenta]{host}[/]")
@@ -286,16 +286,10 @@ def validate_services(cfg: Config, services: list[str], *, hint: str | None = No
raise typer.Exit(1)
def validate_host(cfg: Config, host: str) -> None:
"""Validate that a host exists in config. Exits with error if not found."""
if host not in cfg.hosts:
print_error(MSG_HOST_NOT_FOUND.format(name=host))
raise typer.Exit(1)
def validate_hosts(cfg: Config, hosts: list[str]) -> None:
"""Validate that all hosts exist in config. Exits with error if any not found."""
invalid = [h for h in hosts if h not in cfg.hosts]
def validate_hosts(cfg: Config, hosts: str | list[str]) -> None:
"""Validate that host(s) exist in config. Exits with error if any not found."""
host_list = [hosts] if isinstance(hosts, str) else hosts
invalid = [h for h in host_list if h not in cfg.hosts]
if invalid:
for h in invalid:
print_error(MSG_HOST_NOT_FOUND.format(name=h))
@@ -304,7 +298,7 @@ def validate_hosts(cfg: Config, hosts: list[str]) -> None:
def validate_host_for_service(cfg: Config, service: str, host: str) -> None:
"""Validate that a host is valid for a service."""
validate_host(cfg, host)
validate_hosts(cfg, host)
allowed_hosts = cfg.get_hosts(service)
if host not in allowed_hosts:
print_error(
@@ -328,27 +322,3 @@ def validate_service_selection(
if methods > 1:
print_error("Use only one of: service names, [bold]--all[/], or [bold]--host[/]")
raise typer.Exit(1)
def run_host_operation(
cfg: Config,
svc_list: list[str],
host: str,
command: str,
action_verb: str,
state_callback: Callable[[Config, str, str], None],
) -> None:
"""Run an operation on a specific host for multiple services."""
from compose_farm.executor import run_compose_on_host # noqa: PLC0415
results: list[CommandResult] = []
for service in svc_list:
validate_host_for_service(cfg, service, host)
console.print(f"[cyan]\\[{service}][/] {action_verb} on [magenta]{host}[/]...")
result = run_async(run_compose_on_host(cfg, service, host, command, raw=True))
print() # Newline after raw output
results.append(result)
if result.success:
state_callback(cfg, service, host)
maybe_regenerate_traefik(cfg, results)
report_results(results)

View File

@@ -40,24 +40,12 @@ _RawOption = Annotated[
def _get_editor() -> str:
"""Get the user's preferred editor.
Checks $EDITOR, then $VISUAL, then falls back to platform defaults.
"""
for env_var in ("EDITOR", "VISUAL"):
editor = os.environ.get(env_var)
if editor:
return editor
"""Get the user's preferred editor ($EDITOR > $VISUAL > platform default)."""
if editor := os.environ.get("EDITOR") or os.environ.get("VISUAL"):
return editor
if platform.system() == "Windows":
return "notepad"
# Try common editors on Unix-like systems
for editor in ("nano", "vim", "vi"):
if shutil.which(editor):
return editor
return "vi"
return next((e for e in ("nano", "vim", "vi") if shutil.which(e)), "vi")
def _generate_template() -> str:
@@ -80,20 +68,16 @@ def _get_config_file(path: Path | None) -> Path | None:
return config_path.resolve() if config_path else None
def _report_no_config_found() -> None:
"""Report that no config file was found in search paths."""
console.print("[yellow]No config file found.[/yellow]")
console.print("\nSearched locations:")
for p in config_search_paths():
status = "[green]exists[/green]" if p.exists() else "[dim]not found[/dim]"
console.print(f" - {p} ({status})")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
def _report_config_path_not_exists(config_file: Path) -> None:
"""Report that an explicit config path doesn't exist."""
def _report_missing_config(explicit_path: Path | None = None) -> None:
"""Report that a config file was not found."""
console.print("[yellow]Config file not found.[/yellow]")
console.print(f"\nProvided path does not exist: [cyan]{config_file}[/cyan]")
if explicit_path:
console.print(f"\nProvided path does not exist: [cyan]{explicit_path}[/cyan]")
else:
console.print("\nSearched locations:")
for p in config_search_paths():
status = "[green]exists[/green]" if p.exists() else "[dim]not found[/dim]"
console.print(f" - {p} ({status})")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
@@ -140,11 +124,11 @@ def config_edit(
config_file = _get_config_file(path)
if config_file is None:
_report_no_config_found()
_report_missing_config()
raise typer.Exit(1)
if not config_file.exists():
_report_config_path_not_exists(config_file)
_report_missing_config(config_file)
raise typer.Exit(1)
editor = _get_editor()
@@ -180,11 +164,11 @@ def config_show(
config_file = _get_config_file(path)
if config_file is None:
_report_no_config_found()
_report_missing_config()
raise typer.Exit(0)
if not config_file.exists():
_report_config_path_not_exists(config_file)
_report_missing_config(config_file)
raise typer.Exit(1)
content = config_file.read_text(encoding="utf-8")
@@ -211,7 +195,7 @@ def config_path(
config_file = _get_config_file(path)
if config_file is None:
_report_no_config_found()
_report_missing_config()
raise typer.Exit(1)
# Just print the path for easy piping

View File

@@ -2,13 +2,10 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from typing import Annotated
import typer
if TYPE_CHECKING:
from compose_farm.config import Config
from compose_farm.cli.app import app
from compose_farm.cli.common import (
AllOption,
@@ -148,40 +145,8 @@ def update(
report_results(results)
def _report_pending_migrations(cfg: Config, migrations: list[str]) -> None:
"""Report services that need migration."""
console.print(f"[cyan]Services to migrate ({len(migrations)}):[/]")
for svc in migrations:
current = get_service_host(cfg, svc)
target = cfg.get_hosts(svc)[0]
console.print(f" [cyan]{svc}[/]: [magenta]{current}[/] → [magenta]{target}[/]")
def _report_pending_orphans(orphaned: dict[str, str | list[str]]) -> None:
"""Report orphaned services that will be stopped."""
console.print(f"[yellow]Orphaned services to stop ({len(orphaned)}):[/]")
for svc, hosts in orphaned.items():
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(hosts)}[/]")
def _report_pending_starts(cfg: Config, missing: list[str]) -> None:
"""Report services that will be started."""
console.print(f"[green]Services to start ({len(missing)}):[/]")
for svc in missing:
target = format_host(cfg.get_hosts(svc))
console.print(f" [cyan]{svc}[/] on [magenta]{target}[/]")
def _report_pending_refresh(cfg: Config, to_refresh: list[str]) -> None:
"""Report services that will be refreshed."""
console.print(f"[blue]Services to refresh ({len(to_refresh)}):[/]")
for svc in to_refresh:
target = format_host(cfg.get_hosts(svc))
console.print(f" [cyan]{svc}[/] on [magenta]{target}[/]")
@app.command(rich_help_panel="Lifecycle")
def apply(
def apply( # noqa: PLR0912 (multi-phase reconciliation needs these branches)
dry_run: Annotated[
bool,
typer.Option("--dry-run", "-n", help="Show what would change without executing"),
@@ -229,13 +194,23 @@ def apply(
# Report what will be done
if has_orphans:
_report_pending_orphans(orphaned)
console.print(f"[yellow]Orphaned services to stop ({len(orphaned)}):[/]")
for svc, hosts in orphaned.items():
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(hosts)}[/]")
if has_migrations:
_report_pending_migrations(cfg, migrations)
console.print(f"[cyan]Services to migrate ({len(migrations)}):[/]")
for svc in migrations:
current = get_service_host(cfg, svc)
target = cfg.get_hosts(svc)[0]
console.print(f" [cyan]{svc}[/]: [magenta]{current}[/] → [magenta]{target}[/]")
if has_missing:
_report_pending_starts(cfg, missing)
console.print(f"[green]Services to start ({len(missing)}):[/]")
for svc in missing:
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(cfg.get_hosts(svc))}[/]")
if has_refresh:
_report_pending_refresh(cfg, to_refresh)
console.print(f"[blue]Services to refresh ({len(to_refresh)}):[/]")
for svc in to_refresh:
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(cfg.get_hosts(svc))}[/]")
if dry_run:
console.print(f"\n{MSG_DRY_RUN}")

View File

@@ -10,6 +10,9 @@ from pydantic import BaseModel, Field, model_validator
from .paths import config_search_paths, find_config_path
# Supported compose filenames, in priority order
COMPOSE_FILENAMES = ("compose.yaml", "compose.yml", "docker-compose.yml", "docker-compose.yaml")
class Host(BaseModel):
"""SSH host configuration."""
@@ -90,17 +93,9 @@ class Config(BaseModel):
return self.hosts[host_names[0]]
def get_compose_path(self, service: str) -> Path:
"""Get compose file path for a service.
Tries compose.yaml first, then docker-compose.yml.
"""
"""Get compose file path for a service (tries compose.yaml first)."""
service_dir = self.compose_dir / service
for filename in (
"compose.yaml",
"compose.yml",
"docker-compose.yml",
"docker-compose.yaml",
):
for filename in COMPOSE_FILENAMES:
candidate = service_dir / filename
if candidate.exists():
return candidate
@@ -109,21 +104,12 @@ class Config(BaseModel):
def discover_compose_dirs(self) -> set[str]:
"""Find all directories in compose_dir that contain a compose file."""
compose_filenames = {
"compose.yaml",
"compose.yml",
"docker-compose.yml",
"docker-compose.yaml",
}
found: set[str] = set()
if not self.compose_dir.exists():
return found
for subdir in self.compose_dir.iterdir():
if subdir.is_dir():
for filename in compose_filenames:
if (subdir / filename).exists():
found.add(subdir.name)
break
if subdir.is_dir() and any((subdir / f).exists() for f in COMPOSE_FILENAMES):
found.add(subdir.name)
return found

View File

@@ -52,50 +52,40 @@ async def stream_to_task(task_id: str, message: str) -> None:
tasks[task_id]["output"].append(message)
async def _stream_subprocess(task_id: str, args: list[str], env: dict[str, str]) -> int:
"""Run subprocess and stream output to task buffer. Returns exit code."""
process = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
)
if process.stdout:
async for line in process.stdout:
text = line.decode("utf-8", errors="replace")
# Convert \n to \r\n for xterm.js
if text.endswith("\n") and not text.endswith("\r\n"):
text = text[:-1] + "\r\n"
await stream_to_task(task_id, text)
return await process.wait()
async def run_cli_streaming(
config: Config,
args: list[str],
task_id: str,
) -> None:
"""Run a cf CLI command as subprocess and stream output to task buffer.
This reuses all CLI logic including Rich formatting, progress bars, etc.
The subprocess gets a pseudo-TTY via FORCE_COLOR so Rich outputs ANSI codes.
"""
"""Run a cf CLI command as subprocess and stream output to task buffer."""
try:
# Build command - config option goes after the subcommand
cmd = ["cf", *args, f"--config={config.config_path}"]
await stream_to_task(task_id, f"{DIM}$ {' '.join(['cf', *args])}{RESET}{CRLF}")
# Show command being executed
cmd_display = " ".join(["cf", *args])
await stream_to_task(task_id, f"{DIM}$ {cmd_display}{RESET}{CRLF}")
# Force color output even though there's no real TTY
# Set COLUMNS for Rich/Typer to format output correctly
env = {"FORCE_COLOR": "1", "TERM": "xterm-256color", "COLUMNS": "120"}
# Ensure SSH agent is available (auto-detect if needed)
ssh_sock = get_ssh_auth_sock()
if ssh_sock:
# Build environment with color support and SSH agent
env = {**os.environ, "FORCE_COLOR": "1", "TERM": "xterm-256color", "COLUMNS": "120"}
if ssh_sock := get_ssh_auth_sock():
env["SSH_AUTH_SOCK"] = ssh_sock
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env={**os.environ, **env},
)
# Stream output line by line
if process.stdout:
async for line in process.stdout:
text = line.decode("utf-8", errors="replace")
# Convert \n to \r\n for xterm.js
if text.endswith("\n") and not text.endswith("\r\n"):
text = text[:-1] + "\r\n"
await stream_to_task(task_id, text)
exit_code = await process.wait()
exit_code = await _stream_subprocess(task_id, cmd, env)
tasks[task_id]["status"] = "completed" if exit_code == 0 else "failed"
tasks[task_id]["completed_at"] = time.time()
@@ -122,71 +112,31 @@ async def _run_cli_via_ssh(
args: list[str],
task_id: str,
) -> None:
"""Run a cf CLI command via SSH to the host.
Used for self-updates to ensure the command survives container restart.
Uses setsid to run command in a new session (completely detached), with
output going to a log file. We tail the log to stream output. When SSH
dies (container killed), the tail dies but the setsid process continues.
"""
"""Run a cf CLI command via SSH for self-updates (survives container restart)."""
try:
# Get the host for the web service
host = config.get_host(CF_WEB_SERVICE)
cf_cmd = f"cf {' '.join(args)} --config={config.config_path}"
log_file = "/tmp/cf-self-update.log" # noqa: S108
# Build the remote command:
# 1. setsid runs command in new session (survives SSH disconnect)
# 2. Output goes to log file
# 3. tail -f streams the log (dies when SSH dies, but command continues)
# 4. wait for tail or timeout after command should be done
# setsid detaches command; tail streams output until SSH dies
remote_cmd = (
f"rm -f {log_file} && "
f"PATH=$HOME/.local/bin:/usr/local/bin:$PATH "
f"setsid sh -c '{cf_cmd} > {log_file} 2>&1' & "
f"sleep 0.3 && "
f"tail -f {log_file} 2>/dev/null"
f"sleep 0.3 && tail -f {log_file} 2>/dev/null"
)
# Show what we're doing
await stream_to_task(
task_id,
f"{DIM}$ {cf_cmd}{RESET}{CRLF}",
)
await stream_to_task(
task_id,
f"{GREEN}Running via SSH (detached with setsid){RESET}{CRLF}",
)
await stream_to_task(task_id, f"{DIM}$ {cf_cmd}{RESET}{CRLF}")
await stream_to_task(task_id, f"{GREEN}Running via SSH (detached with setsid){RESET}{CRLF}")
# Build SSH command (no TTY needed, output comes from tail)
ssh_args = build_ssh_command(host, remote_cmd, tty=False)
# Set up environment with SSH agent
env = {**os.environ}
ssh_sock = get_ssh_auth_sock()
if ssh_sock:
if ssh_sock := get_ssh_auth_sock():
env["SSH_AUTH_SOCK"] = ssh_sock
process = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
)
exit_code = await _stream_subprocess(task_id, ssh_args, env)
# Stream output until SSH dies (container killed) or command completes
if process.stdout:
async for line in process.stdout:
text = line.decode("utf-8", errors="replace")
if text.endswith("\n") and not text.endswith("\r\n"):
text = text[:-1] + "\r\n"
await stream_to_task(task_id, text)
exit_code = await process.wait()
# Exit code 255 means SSH connection closed (container died during down)
# This is expected for self-updates - setsid ensures command continues
# Exit code 255 = SSH closed (container died during down) - expected for self-updates
if exit_code == 255: # noqa: PLR2004
await stream_to_task(
task_id,