Compare commits

...

5 Commits

Author SHA1 Message Date
Bas Nijholt
32dc6b3665 Skip empty lines in streaming output 2025-12-13 23:50:35 -08:00
Bas Nijholt
7d98e664e9 Auto-detect local IPs to skip SSH when on target host 2025-12-13 23:48:28 -08:00
Bas Nijholt
6763403700 Fix duplicate prefix before traefik config message 2025-12-13 23:46:41 -08:00
Bas Nijholt
feb0e13bfd Add check command to find missing services 2025-12-13 23:43:47 -08:00
Bas Nijholt
b86f6d190f Add Rich styling to CLI output
- Service names in cyan, host names in magenta
- Success checkmarks, warning/error symbols
- Colored sync diff indicators (+/-/~)
- Unicode arrows for migrations
2025-12-13 23:40:07 -08:00
6 changed files with 132 additions and 35 deletions

View File

@@ -12,6 +12,7 @@ dependencies = [
"pydantic>=2.0.0",
"asyncssh>=2.14.0",
"pyyaml>=6.0",
"rich>=13.0.0",
]
[project.scripts]

View File

@@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, Annotated, TypeVar
import typer
import yaml
from rich.console import Console
from . import __version__
from .config import Config, load_config
@@ -28,6 +29,9 @@ if TYPE_CHECKING:
T = TypeVar("T")
console = Console(highlight=False)
err_console = Console(stderr=True, highlight=False)
def _maybe_regenerate_traefik(cfg: Config) -> None:
"""Regenerate traefik config if traefik_file is configured."""
@@ -38,11 +42,12 @@ def _maybe_regenerate_traefik(cfg: Config) -> None:
dynamic, warnings = generate_traefik_config(cfg, list(cfg.services.keys()))
cfg.traefik_file.parent.mkdir(parents=True, exist_ok=True)
cfg.traefik_file.write_text(yaml.safe_dump(dynamic, sort_keys=False))
typer.echo(f"Traefik config updated: {cfg.traefik_file}")
console.print() # Ensure we're on a new line after streaming output
console.print(f"[green]✓[/] Traefik config updated: {cfg.traefik_file}")
for warning in warnings:
typer.echo(warning, err=True)
err_console.print(f"[yellow]![/] {warning}")
except (FileNotFoundError, ValueError) as exc:
typer.echo(f"Warning: Failed to update traefik config: {exc}", err=True)
err_console.print(f"[yellow]![/] Failed to update traefik config: {exc}")
def _version_callback(value: bool) -> None:
@@ -86,7 +91,7 @@ def _get_services(
if all_services:
return list(config.services.keys()), config
if not services:
typer.echo("Error: Specify services or use --all", err=True)
err_console.print("[red]✗[/] Specify services or use --all")
raise typer.Exit(1)
return list(services), config
@@ -101,7 +106,9 @@ def _report_results(results: list[CommandResult]) -> None:
failed = [r for r in results if not r.success]
if failed:
for r in failed:
typer.echo(f"[{r.service}] Failed with exit code {r.exit_code}", err=True)
err_console.print(
f"[cyan]\\[{r.service}][/] [red]Failed with exit code {r.exit_code}[/]"
)
raise typer.Exit(1)
@@ -137,15 +144,18 @@ async def _up_with_migration(
# If service is deployed elsewhere, migrate it
if current_host and current_host != target_host:
if current_host in cfg.hosts:
typer.echo(f"[{service}] Migrating from {current_host} to {target_host}...")
console.print(
f"[cyan]\\[{service}][/] Migrating from "
f"[magenta]{current_host}[/] → [magenta]{target_host}[/]..."
)
down_result = await run_compose_on_host(cfg, service, current_host, "down")
if not down_result.success:
results.append(down_result)
continue
else:
typer.echo(
f"[{service}] Warning: was on {current_host} (not in config), skipping down",
err=True,
err_console.print(
f"[cyan]\\[{service}][/] [yellow]![/] was on "
f"[magenta]{current_host}[/] (not in config), skipping down"
)
# Start on target host
@@ -275,7 +285,7 @@ def traefik_file(
try:
dynamic, warnings = generate_traefik_config(cfg, svc_list)
except (FileNotFoundError, ValueError) as exc:
typer.echo(str(exc), err=True)
err_console.print(f"[red]✗[/] {exc}")
raise typer.Exit(1) from exc
rendered = yaml.safe_dump(dynamic, sort_keys=False)
@@ -283,12 +293,12 @@ def traefik_file(
if output:
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(rendered)
typer.echo(f"Traefik config written to {output}")
console.print(f"[green]✓[/] Traefik config written to {output}")
else:
typer.echo(rendered)
console.print(rendered)
for warning in warnings:
typer.echo(warning, err=True)
err_console.print(f"[yellow]![/] {warning}")
async def _discover_running_services(cfg: Config) -> dict[str, str]:
@@ -324,19 +334,24 @@ def _report_sync_changes(
) -> None:
"""Report sync changes to the user."""
if added:
typer.echo(f"\nNew services found ({len(added)}):")
console.print(f"\nNew services found ({len(added)}):")
for service in sorted(added):
typer.echo(f" + {service} on {discovered[service]}")
console.print(f" [green]+[/] [cyan]{service}[/] on [magenta]{discovered[service]}[/]")
if changed:
typer.echo(f"\nServices on different hosts ({len(changed)}):")
console.print(f"\nServices on different hosts ({len(changed)}):")
for service, old_host, new_host in sorted(changed):
typer.echo(f" ~ {service}: {old_host} -> {new_host}")
console.print(
f" [yellow]~[/] [cyan]{service}[/]: "
f"[magenta]{old_host}[/] → [magenta]{new_host}[/]"
)
if removed:
typer.echo(f"\nServices no longer running ({len(removed)}):")
console.print(f"\nServices no longer running ({len(removed)}):")
for service in sorted(removed):
typer.echo(f" - {service} (was on {current_state[service]})")
console.print(
f" [red]-[/] [cyan]{service}[/] (was on [magenta]{current_state[service]}[/])"
)
@app.command()
@@ -357,7 +372,7 @@ def sync(
cfg = load_config(config)
current_state = load_state(cfg)
typer.echo("Discovering running services...")
console.print("Discovering running services...")
discovered = _run_async(_discover_running_services(cfg))
# Calculate changes
@@ -374,25 +389,55 @@ def sync(
if state_changed:
_report_sync_changes(added, removed, changed, discovered, current_state)
else:
typer.echo("State is already in sync.")
console.print("[green]✓[/] State is already in sync.")
if dry_run:
typer.echo("\n(dry-run: no changes made)")
console.print("\n[dim](dry-run: no changes made)[/]")
return
# Update state file
if state_changed:
save_state(cfg, discovered)
typer.echo(f"\nState updated: {len(discovered)} services tracked.")
console.print(f"\n[green]✓[/] State updated: {len(discovered)} services tracked.")
# Capture image digests for running services
if discovered:
typer.echo("\nCapturing image digests...")
console.print("\nCapturing image digests...")
try:
path = _run_async(snapshot_services(cfg, list(discovered.keys()), log_path=log_path))
typer.echo(f"Digests written to {path}")
console.print(f"[green]✓[/] Digests written to {path}")
except RuntimeError as exc:
typer.echo(f"Warning: {exc}", err=True)
err_console.print(f"[yellow]![/] {exc}")
@app.command()
def check(
config: ConfigOption = None,
) -> None:
"""Check for compose directories not in config (and vice versa)."""
cfg = load_config(config)
configured = set(cfg.services.keys())
on_disk = cfg.discover_compose_dirs()
missing_from_config = sorted(on_disk - configured)
missing_from_disk = sorted(configured - on_disk)
if missing_from_config:
console.print(f"\n[yellow]Not in config[/] ({len(missing_from_config)}):")
for name in missing_from_config:
console.print(f" [yellow]+[/] [cyan]{name}[/]")
if missing_from_disk:
console.print(f"\n[red]No compose file found[/] ({len(missing_from_disk)}):")
for name in missing_from_disk:
console.print(f" [red]-[/] [cyan]{name}[/]")
if not missing_from_config and not missing_from_disk:
console.print("[green]✓[/] All compose directories are in config.")
elif missing_from_config:
console.print(f"\n[dim]To add missing services, append to {cfg.config_path}:[/]")
for name in missing_from_config:
console.print(f"[dim] {name}: docker-debian[/]")
if __name__ == "__main__":

View File

@@ -65,6 +65,25 @@ class Config(BaseModel):
# Default to compose.yaml if none exist (will error later)
return service_dir / "compose.yaml"
def discover_compose_dirs(self) -> set[str]:
"""Find all directories in compose_dir that contain a compose file."""
compose_filenames = {
"compose.yaml",
"compose.yml",
"docker-compose.yml",
"docker-compose.yaml",
}
found: set[str] = set()
if not self.compose_dir.exists():
return found
for subdir in self.compose_dir.iterdir():
if subdir.is_dir():
for filename in compose_filenames:
if (subdir / filename).exists():
found.add(subdir.name)
break
return found
def _parse_hosts(raw_hosts: dict[str, str | dict[str, str | int]]) -> dict[str, Host]:
"""Parse hosts from config, handling both simple and full forms."""

View File

@@ -3,18 +3,41 @@
from __future__ import annotations
import asyncio
import sys
import socket
from dataclasses import dataclass
from functools import lru_cache
from typing import TYPE_CHECKING, Any
import asyncssh
from rich.console import Console
if TYPE_CHECKING:
from .config import Config, Host
_console = Console(highlight=False)
_err_console = Console(stderr=True, highlight=False)
LOCAL_ADDRESSES = frozenset({"local", "localhost", "127.0.0.1", "::1"})
@lru_cache(maxsize=1)
def _get_local_ips() -> frozenset[str]:
"""Get all IP addresses of the current machine."""
ips: set[str] = set()
try:
hostname = socket.gethostname()
# Get all addresses for hostname
for info in socket.getaddrinfo(hostname, None):
ips.add(info[4][0])
# Also try getting the default outbound IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
ips.add(s.getsockname()[0])
except OSError:
pass
return frozenset(ips)
@dataclass
class CommandResult:
"""Result of a command execution."""
@@ -28,7 +51,11 @@ class CommandResult:
def _is_local(host: Host) -> bool:
"""Check if host should run locally (no SSH)."""
return host.address.lower() in LOCAL_ADDRESSES
addr = host.address.lower()
if addr in LOCAL_ADDRESSES:
return True
# Check if address matches any of this machine's IPs
return addr in _get_local_ips()
async def _run_local_command(
@@ -53,12 +80,14 @@ async def _run_local_command(
*,
is_stderr: bool = False,
) -> None:
output = sys.stderr if is_stderr else sys.stdout
console = _err_console if is_stderr else _console
while True:
line = await reader.readline()
if not line:
break
print(f"[{prefix}] {line.decode()}", end="", file=output, flush=True)
text = line.decode()
if text.strip(): # Skip empty lines
console.print(f"[cyan]\\[{prefix}][/] {text}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
@@ -80,7 +109,7 @@ async def _run_local_command(
stderr=stderr_data.decode() if stderr_data else "",
)
except OSError as e:
print(f"[{service}] Local error: {e}", file=sys.stderr)
_err_console.print(f"[cyan]\\[{service}][/] [red]Local error:[/] {e}")
return CommandResult(service=service, exit_code=1, success=False)
@@ -111,9 +140,10 @@ async def _run_ssh_command(
*,
is_stderr: bool = False,
) -> None:
output = sys.stderr if is_stderr else sys.stdout
console = _err_console if is_stderr else _console
async for line in reader:
print(f"[{prefix}] {line}", end="", file=output, flush=True)
if line.strip(): # Skip empty lines
console.print(f"[cyan]\\[{prefix}][/] {line}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
@@ -135,7 +165,7 @@ async def _run_ssh_command(
stderr=stderr_data,
)
except (OSError, asyncssh.Error) as e:
print(f"[{service}] SSH error: {e}", file=sys.stderr)
_err_console.print(f"[cyan]\\[{service}][/] [red]SSH error:[/] {e}")
return CommandResult(service=service, exit_code=1, success=False)

View File

@@ -171,4 +171,4 @@ class TestReportSyncChanges:
)
captured = capsys.readouterr()
assert "Services on different hosts (1)" in captured.out
assert "~ plex: nas01 -> nas02" in captured.out
assert "~ plex: nas01 nas02" in captured.out

2
uv.lock generated
View File

@@ -131,6 +131,7 @@ dependencies = [
{ name = "asyncssh" },
{ name = "pydantic" },
{ name = "pyyaml" },
{ name = "rich" },
{ name = "typer" },
]
@@ -151,6 +152,7 @@ requires-dist = [
{ name = "asyncssh", specifier = ">=2.14.0" },
{ name = "pydantic", specifier = ">=2.0.0" },
{ name = "pyyaml", specifier = ">=6.0" },
{ name = "rich", specifier = ">=13.0.0" },
{ name = "typer", specifier = ">=0.9.0" },
]