Compare commits

...

30 Commits

Author SHA1 Message Date
Bas Nijholt
920b593d5f Fix mypy error: add type annotation for proc variable 2025-12-17 00:17:20 -08:00
Bas Nijholt
27d9b08ce2 Add -f shorthand for --full in apply command 2025-12-17 00:10:16 -08:00
Bas Nijholt
700cdacb4d Add 'a' alias for apply command (cf a = cf apply) 2025-12-17 00:09:45 -08:00
Bas Nijholt
3c7a532704 Add comments explaining lazy imports for startup performance 2025-12-17 00:08:28 -08:00
Bas Nijholt
6048f37ad5 Lazy import pydantic for faster CLI startup
- Create paths.py module with lightweight path utilities (no pydantic)
- Move Config imports to TYPE_CHECKING blocks in CLI modules
- Lazy import load_config only when needed

Combined with asyncssh lazy loading, cf --help now starts in ~150ms
instead of ~500ms (70% faster).
2025-12-17 00:07:15 -08:00
Bas Nijholt
f18952633f Lazy import asyncssh for faster CLI startup
Move asyncssh import inside _run_ssh_command() so it's only loaded
when actually executing SSH commands. This cuts CLI import time
from 414ms to 200ms (52% faster).

cf --help now starts in ~260ms instead of ~500ms.
2025-12-16 23:59:37 -08:00
Bas Nijholt
437257e631 Add cf config symlink command
Creates a symlink from ~/.config/compose-farm/compose-farm.yaml to a
local config file using absolute paths (avoids broken relative symlinks).

Usage:
  cf config symlink                     # Link to ./compose-farm.yaml
  cf config symlink /path/to/config.yaml  # Link to specific file

State files are automatically stored next to the actual config (not
the symlink) since config_path.resolve() follows symlinks.
2025-12-16 23:56:58 -08:00
Bas Nijholt
c720170f26 Add --full flag to apply command
When --full is passed, apply also runs 'docker compose up' on all
services (not just missing/migrating ones) to pick up any config
changes (compose file, .env, etc).

- cf apply          # Fast: state reconciliation only
- cf apply --full   # Thorough: also refresh all running services
2025-12-16 23:54:19 -08:00
Bas Nijholt
d9c03d6509 Feature apply as the hero command in README
- Update intro and NOTE block to lead with cf apply
- Rewrite "How It Works" to show declarative workflow first
- Move apply to top of command table (bolded)
- Reorder examples to show apply as "the main command"
- Update automation bullet to highlight one-command reconciliation
2025-12-16 23:49:49 -08:00
Bas Nijholt
3b7066711f Merge pull request #12 from basnijholt/feature/orphaned-services
Add apply command and refactor CLI for clearer UX
2025-12-16 23:34:34 -08:00
Bas Nijholt
6a630c40a1 Update apply command description to include starting missing services 2025-12-16 23:32:27 -08:00
Bas Nijholt
9f9c042b66 Remove up --migrate flag in favor of apply
Simplifies CLI by having one clear reconciliation command:
- cf up <service>  = start specific services (auto-migrates if needed)
- cf apply         = full reconcile (stop orphans + migrate + start missing)

The --migrate flag was redundant with 'apply --no-orphans'.
2025-12-16 23:27:19 -08:00
github-actions[bot]
2a6d7d0b85 Update README.md 2025-12-17 07:21:38 +00:00
Bas Nijholt
6d813ccd84 Merge af9c760fb8 into affed2edcf 2025-12-17 07:21:23 +00:00
Bas Nijholt
af9c760fb8 Add missing service detection to apply command
Previously, apply only handled:
1. Stopping orphans (in state, not in config)
2. Migrating services (in state, wrong host)

Now it also handles:
3. Starting missing services (in config, not in state)

This fixes the case where a service was stopped as an orphan, then
re-added to config - apply would say "nothing to do" instead of
starting it.

Added get_services_not_in_state() to state.py and updated tests.
2025-12-16 23:21:09 -08:00
Bas Nijholt
90656b05e3 Add tests for apply command and down --orphaned flag
Tests cover:
- apply: nothing to do, dry-run preview, migrations, orphan cleanup, --no-orphans
- down --orphaned: no orphans, stops services, error on invalid combinations

Lifecycle.py coverage improved from 20% to 61%.
2025-12-16 23:15:46 -08:00
github-actions[bot]
d7a3d4e8c7 Update README.md 2025-12-17 07:10:52 +00:00
Bas Nijholt
35f0b8bf99 Merge be6b391121 into affed2edcf 2025-12-16 23:10:36 -08:00
Bas Nijholt
be6b391121 Refactor CLI commands for clearer UX
Separate "read state from reality" from "write config to reality":
- Rename `sync` to `refresh` (updates local state from running services)
- Add `apply` command (makes reality match config: migrate + stop orphans)
- Add `down --orphaned` flag (stops services removed from config)
- Modify `up --migrate` to only handle migrations (not orphans)

The new mental model:
- `refresh` = Reality → State (discover what's running)
- `apply` = Config → Reality (reconcile: migrate services + stop orphans)

Also extract private helper functions for reporting to match codebase style.
2025-12-16 23:06:42 -08:00
Bas Nijholt
7f56ba6a41 Add orphaned service detection and cleanup
When services are removed from config but still tracked in state,
`cf up --migrate` now stops them automatically. This makes the
config truly declarative - comment out a service, run migrate,
and it stops.

Changes:
- Add get_orphaned_services() to state.py for detecting orphans
- Add stop_orphaned_services() to operations.py for cleanup
- Update lifecycle.py to call stop_orphaned_services on --migrate
- Refactor _report_orphaned_services to use shared function
- Rename "missing_from_config" to "unmanaged" for clarity
- Add tests for get_orphaned_services
- Only remove from state on successful down (not on failure)
2025-12-16 22:53:26 -08:00
Bas Nijholt
4b3d7a861e Fix migration and update for services with buildable images
Use `pull --ignore-buildable` to skip images that have `build:` defined
in the compose file, preventing pull failures for locally-built images
like gitea-runner-custom. The build step then handles these images.
2025-12-16 19:42:24 -08:00
Bas Nijholt
affed2edcf Refactor operations.py into smaller helpers
- Add PreflightResult NamedTuple with .ok property
- Extract _run_compose_step to handle raw output and interrupts
- Extract _up_single_service for single-host migration logic
- Deduplicate pull/build handling in _migrate_service
- Add was_running check to only rollback if service was running
2025-12-16 17:08:25 -08:00
Bas Nijholt
34642e8b8e Rollback to old host when migration up fails
When up fails on the target host after migration has already stopped
the service on the old host, attempt to restart on the old host as a
rollback. This prevents services from being left down after a failed
migration attempt.
2025-12-16 16:59:09 -08:00
Bas Nijholt
4c8b6c5209 Add init-network hint when network is missing 2025-12-16 16:22:06 -08:00
Bas Nijholt
2b38ed28c0 Skip traefik regeneration when all services fail
Don't update traefik config if all services in the operation failed.
This prevents adding routes for services that aren't actually running.
2025-12-16 16:21:09 -08:00
Bas Nijholt
26b57895ce Clean up orphaned containers when migration up fails
If `up` fails after migration (when we've already run `down` on the
old host), run `down` on the target host to clean up any containers
that were created but couldn't start (e.g., due to missing devices).

This prevents orphaned containers from lingering on the failed host.
2025-12-16 16:16:09 -08:00
Bas Nijholt
367da13fae Fix path existence check for permission denied
Use stat instead of test -e to distinguish "no such file" from
"permission denied". If stat fails with permission denied, the
path exists (just not accessible), so report it as existing.

Fixes false "missing path" errors for directories with restricted
permissions like /mnt/data/immich/library.
2025-12-16 15:08:38 -08:00
Bas Nijholt
d6ecd42559 Consolidate service requirement checks into shared function
Move check_service_requirements() to operations.py as a public function
that verifies paths, networks, and devices exist on a target host. Both
CLI check command and pre-flight migration checks now use this shared
function, eliminating duplication. Also adds device checking to the
check command output.
2025-12-16 14:53:59 -08:00
Bas Nijholt
233c33fa52 Add device checking to pre-flight migration checks
Services with devices: mappings (e.g., /dev/dri for GPU acceleration)
now have those devices verified on the target host before migration.
This prevents the scenario where a service is stopped on the old host
but fails to start on the new host due to missing devices.

Adds parse_devices() to extract host device paths from compose files.
2025-12-16 14:35:52 -08:00
Bas Nijholt
43974c5743 Abort on first Ctrl+C during migrations
Detect when SSH subprocess is killed by signal (exit code < 0 or 255)
and treat it as an interrupt. This allows single Ctrl+C to abort the
entire operation instead of requiring two presses.
2025-12-16 14:31:33 -08:00
20 changed files with 1516 additions and 262 deletions

View File

@@ -15,8 +15,8 @@ compose_farm/
│ ├── app.py # Shared Typer app instance, version callback
│ ├── common.py # Shared helpers, options, progress bar utilities
│ ├── config.py # Config subcommand (init, show, path, validate, edit)
│ ├── lifecycle.py # up, down, pull, restart, update commands
│ ├── management.py # sync, check, init-network, traefik-file commands
│ ├── lifecycle.py # up, down, pull, restart, update, apply commands
│ ├── management.py # refresh, check, init-network, traefik-file commands
│ └── monitoring.py # logs, ps, stats commands
├── config.py # Pydantic models, YAML loading
├── compose.py # Compose file parsing (.env, ports, volumes, networks)
@@ -55,15 +55,16 @@ CLI available as `cf` or `compose-farm`.
| Command | Description |
|---------|-------------|
| `up` | Start services (`docker compose up -d`), auto-migrates if host changed. Use `--migrate` for auto-detection |
| `down` | Stop services (`docker compose down`) |
| `up` | Start services (`docker compose up -d`), auto-migrates if host changed |
| `down` | Stop services (`docker compose down`). Use `--orphaned` to stop services removed from config |
| `pull` | Pull latest images |
| `restart` | `down` + `up -d` |
| `update` | `pull` + `down` + `up -d` |
| `apply` | Make reality match config: migrate services + stop orphans. Use `--dry-run` to preview |
| `logs` | Show service logs |
| `ps` | Show status of all services |
| `stats` | Show overview (hosts, services, pending migrations; `--live` for container counts) |
| `sync` | Discover running services, update state, capture image digests |
| `refresh` | Update state from reality: discover running services, capture image digests |
| `check` | Validate config, traefik labels, mounts, networks; show host compatibility |
| `init-network` | Create Docker network on hosts with consistent subnet/gateway |
| `traefik-file` | Generate Traefik file-provider config from compose labels |

View File

@@ -10,7 +10,7 @@
A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
> [!NOTE]
> Run `docker compose` commands across multiple hosts via SSH. One YAML maps services to hosts. Change the mapping, run `up`, and it auto-migrates. No Kubernetes, no Swarm, no magic.
> Run `docker compose` commands across multiple hosts via SSH. One YAML maps services to hosts. Run `cf apply` and reality matches your config—services start, migrate, or stop as needed. No Kubernetes, no Swarm, no magic.
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
@@ -43,7 +43,7 @@ I used to run 100+ Docker Compose stacks on a single machine that kept running o
Both require changes to your compose files. **Compose Farm requires zero changes**—your existing `docker-compose.yml` files work as-is.
I also wanted a declarative setup—one config file that defines where everything runs. Change the config, run `up`, and services migrate automatically. See [Comparison with Alternatives](#comparison-with-alternatives) for how this compares to other approaches.
I also wanted a declarative setup—one config file that defines where everything runs. Change the config, run `cf apply`, and everything reconciles—services start, migrate, or stop as needed. See [Comparison with Alternatives](#comparison-with-alternatives) for how this compares to other approaches.
<p align="center">
<a href="https://xkcd.com/927/">
@@ -56,18 +56,26 @@ Before you say it—no, this is not a new standard. I changed nothing about my e
Compose Farm just automates what you'd do by hand:
- Runs `docker compose` commands over SSH
- Tracks which service runs on which host
- Auto-migrates services when you change the host assignment
- **One command (`cf apply`) to reconcile everything**—start missing services, migrate moved ones, stop removed ones
- Generates Traefik file-provider config for cross-host routing
**It's a convenience wrapper, not a new paradigm.**
## How It Works
1. You run `cf up plex`
2. Compose Farm looks up which host runs `plex` (e.g., `server-1`)
3. It SSHs to `server-1` (or runs locally if `localhost`)
4. It executes `docker compose -f /opt/compose/plex/docker-compose.yml up -d`
5. Output is streamed back with `[plex]` prefix
**The declarative way** — run `cf apply` and reality matches your config:
1. Compose Farm compares your config to what's actually running
2. Services in config but not running? **Starts them**
3. Services on the wrong host? **Migrates them** (stops on old host, starts on new)
4. Services running but removed from config? **Stops them**
**Under the hood** — each service operation is just SSH + docker compose:
1. Look up which host runs the service (e.g., `plex``server-1`)
2. SSH to `server-1` (or run locally if `localhost`)
3. Execute `docker compose -f /opt/compose/plex/docker-compose.yml up -d`
4. Stream output back with `[plex]` prefix
That's it. No orchestration, no service discovery, no magic.
@@ -228,6 +236,7 @@ The CLI is available as both `compose-farm` and the shorter `cf` alias.
| Command | Description |
|---------|-------------|
| **`cf apply`** | **Make reality match config (start + migrate + stop orphans)** |
| `cf up <svc>` | Start service (auto-migrates if host changed) |
| `cf down <svc>` | Stop service |
| `cf restart <svc>` | down + up |
@@ -235,7 +244,7 @@ The CLI is available as both `compose-farm` and the shorter `cf` alias.
| `cf pull <svc>` | Pull latest images |
| `cf logs -f <svc>` | Follow logs |
| `cf ps` | Show status of all services |
| `cf sync` | Discover running services + capture image digests |
| `cf refresh` | Update state from running services |
| `cf check` | Validate config, mounts, networks |
| `cf init-network` | Create Docker network on hosts |
| `cf traefik-file` | Generate Traefik file-provider config |
@@ -246,13 +255,17 @@ All commands support `--all` to operate on all services.
Each command replaces: look up host → SSH → find compose file → run `ssh host "cd /opt/compose/plex && docker compose up -d"`.
```bash
# Start services (auto-migrates if host changed in config)
cf up plex jellyfin
cf up --all
cf up --migrate # only services needing migration (state ≠ config)
# The main command: make reality match your config
cf apply # start missing + migrate + stop orphans
cf apply --dry-run # preview what would change
cf apply --no-orphans # skip stopping orphaned services
cf apply --full # also refresh all services (picks up config changes)
# Stop services
cf down plex
# Or operate on individual services
cf up plex jellyfin # start services (auto-migrates if host changed)
cf up --all
cf down plex # stop services
cf down --orphaned # stop services removed from config
# Pull latest images
cf pull --all
@@ -263,9 +276,9 @@ cf restart plex
# Update (pull + down + up) - the end-to-end update command
cf update --all
# Sync state with reality (discovers running services + captures image digests)
cf sync # updates state.yaml and dockerfarm-log.toml
cf sync --dry-run # preview without writing
# Update state from reality (discovers running services + captures digests)
cf refresh # updates state.yaml and dockerfarm-log.toml
cf refresh --dry-run # preview without writing
# Validate config, traefik labels, mounts, and networks
cf check # full validation (includes SSH checks)
@@ -316,12 +329,13 @@ cf ps
│ down Stop services (docker compose down). │
│ pull Pull latest images (docker compose pull). │
│ restart Restart services (down + up). │
│ update Update services (pull + down + up).
│ update Update services (pull + build + down + up). │
│ apply Make reality match config (start, migrate, stop as needed). │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Configuration ──────────────────────────────────────────────────────────────╮
│ traefik-file Generate a Traefik file-provider fragment from compose │
│ Traefik labels. │
sync Sync local state with running services.
refresh Update local state from running services. │
│ check Validate configuration, traefik labels, mounts, and networks. │
│ init-network Create Docker network on hosts with consistent settings. │
│ config Manage compose-farm configuration files. │
@@ -346,7 +360,7 @@ When you change a service's host assignment in config and run `up`, Compose Farm
3. Runs `up -d` on the new host
4. Updates state tracking
Use `cf up --migrate` (or `-m`) to automatically find and migrate all services where the current state differs from config—no need to list them manually.
Use `cf apply` to automatically reconcile all services—it finds and migrates services on wrong hosts, stops orphaned services, and starts missing services.
```yaml
# Before: plex runs on server-1
@@ -358,6 +372,14 @@ services:
plex: server-2 # Compose Farm will migrate automatically
```
**Orphaned services**: When you remove (or comment out) a service from config, it becomes "orphaned"—tracked in state but no longer in config. Use these commands to handle orphans:
- `cf apply` — Migrate services AND stop orphans (the full reconcile)
- `cf down --orphaned` — Only stop orphaned services
- `cf apply --dry-run` — Preview what would change before applying
This makes the config truly declarative: comment out a service, run `cf apply`, and it stops.
## Traefik Multihost Ingress (File Provider)
If you run a single Traefik instance on one "frontdoor" host and want it to route to

81
reddit-post.md Normal file
View File

@@ -0,0 +1,81 @@
# Title options
- Multi-host Docker Compose without Kubernetes or file changes
- I built a CLI to run Docker Compose across hosts. Zero changes to your files.
---
# I made a CLI to run Docker Compose across multiple hosts without Kubernetes or Swarm
I've been running 100+ Docker Compose stacks on a single machine, and it kept running out of memory. I needed to spread services across multiple hosts, but:
- **Kubernetes** felt like overkill. I don't need pods, ingress controllers, or 10x more YAML.
- **Docker Swarm** is basically in maintenance mode.
- Both require rewriting my compose files.
So I built **Compose Farm**, a simple CLI that runs `docker compose` commands over SSH. No agents, no cluster setup, no changes to your existing compose files.
## How it works
One YAML file maps services to hosts:
```yaml
compose_dir: /opt/stacks
hosts:
nuc: 192.168.1.10
hp: 192.168.1.11
services:
plex: nuc
jellyfin: hp
sonarr: nuc
radarr: nuc
```
Then just:
```bash
cf up plex # runs on nuc via SSH
cf up --all # starts everything on their assigned hosts
cf logs -f plex # streams logs
cf ps # shows status across all hosts
```
## Auto-migration
Change a service's host in the config and run `cf up`. It stops the service on the old host and starts it on the new one. No manual SSH needed.
```yaml
# Before
plex: nuc
# After (just change this)
plex: hp
```
```bash
cf up plex # migrates automatically
```
## Requirements
- SSH key auth to your hosts
- Same paths on all hosts (I use NFS from my NAS)
- That's it. No agents, no daemons.
## What it doesn't do
- No high availability (if a host goes down, services don't auto-migrate)
- No overlay networking (containers on different hosts can't talk via Docker DNS)
- No service discovery
- No health checks or automatic restarts
It's a convenience wrapper around `docker compose` + SSH. If you need failover or cross-host container networking, you probably do need Swarm or Kubernetes.
## Links
- GitHub: https://github.com/basnijholt/compose-farm
- Install: `uv tool install compose-farm` or `pip install compose-farm`
Built this in 4 days because I was mass-SSHing into machines like a caveman. Happy to answer questions or take feedback!

View File

@@ -18,14 +18,14 @@ from rich.progress import (
TimeElapsedColumn,
)
from compose_farm.config import Config, load_config
from compose_farm.console import console, err_console
from compose_farm.executor import CommandResult # noqa: TC001
from compose_farm.traefik import generate_traefik_config, render_traefik_config
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Generator
from compose_farm.config import Config
from compose_farm.executor import CommandResult
_T = TypeVar("_T")
@@ -81,6 +81,9 @@ def progress_bar(label: str, total: int) -> Generator[tuple[Progress, TaskID], N
def load_config_or_exit(config_path: Path | None) -> Config:
"""Load config or exit with a friendly error message."""
# Lazy import: pydantic adds ~50ms to startup, only load when actually needed
from compose_farm.config import load_config # noqa: PLC0415
try:
return load_config(config_path)
except FileNotFoundError as e:
@@ -106,7 +109,11 @@ def get_services(
def run_async(coro: Coroutine[None, None, _T]) -> _T:
"""Run async coroutine."""
return asyncio.run(coro)
try:
return asyncio.run(coro)
except KeyboardInterrupt:
console.print("\n[yellow]Interrupted[/]")
raise typer.Exit(130) from None # Standard exit code for SIGINT
def report_results(results: list[CommandResult]) -> None:
@@ -139,11 +146,27 @@ def report_results(results: list[CommandResult]) -> None:
raise typer.Exit(1)
def maybe_regenerate_traefik(cfg: Config) -> None:
"""Regenerate traefik config if traefik_file is configured."""
def maybe_regenerate_traefik(
cfg: Config,
results: list[CommandResult] | None = None,
) -> None:
"""Regenerate traefik config if traefik_file is configured.
If results are provided, skips regeneration if all services failed.
"""
if cfg.traefik_file is None:
return
# Skip if all services failed
if results and not any(r.success for r in results):
return
# Lazy import: traefik/yaml adds startup time, only load when traefik_file is configured
from compose_farm.traefik import ( # noqa: PLC0415
generate_traefik_config,
render_traefik_config,
)
try:
dynamic, warnings = generate_traefik_config(cfg, list(cfg.services.keys()))
new_content = render_traefik_config(dynamic)
@@ -199,5 +222,5 @@ def run_host_operation(
results.append(result)
if result.success:
state_callback(cfg, service, host)
maybe_regenerate_traefik(cfg)
maybe_regenerate_traefik(cfg, results)
report_results(results)

View File

@@ -14,8 +14,8 @@ from typing import Annotated
import typer
from compose_farm.cli.app import app
from compose_farm.config import load_config, xdg_config_home
from compose_farm.console import console, err_console
from compose_farm.paths import config_search_paths, default_config_path
config_app = typer.Typer(
name="config",
@@ -23,14 +23,6 @@ config_app = typer.Typer(
no_args_is_help=True,
)
# Default config location (internal)
_USER_CONFIG_PATH = xdg_config_home() / "compose-farm" / "compose-farm.yaml"
# Search paths for existing config (internal)
_CONFIG_PATHS = [
Path("compose-farm.yaml"),
_USER_CONFIG_PATH,
]
# --- CLI Options (same pattern as cli.py) ---
_PathOption = Annotated[
@@ -91,7 +83,7 @@ def _get_config_file(path: Path | None) -> Path | None:
return p.resolve()
# Check standard locations
for p in _CONFIG_PATHS:
for p in config_search_paths():
if p.exists():
return p.resolve()
@@ -108,7 +100,7 @@ def config_init(
The generated config file serves as a template showing all available
options with explanatory comments.
"""
target_path = (path.expanduser().resolve() if path else None) or _USER_CONFIG_PATH
target_path = (path.expanduser().resolve() if path else None) or default_config_path()
if target_path.exists() and not force:
console.print(
@@ -144,7 +136,7 @@ def config_edit(
console.print("[yellow]No config file found.[/yellow]")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
console.print("\nSearched locations:")
for p in _CONFIG_PATHS:
for p in config_search_paths():
console.print(f" - {p}")
raise typer.Exit(1)
@@ -189,7 +181,7 @@ def config_show(
if config_file is None:
console.print("[yellow]No config file found.[/yellow]")
console.print("\nSearched locations:")
for p in _CONFIG_PATHS:
for p in config_search_paths():
status = "[green]exists[/green]" if p.exists() else "[dim]not found[/dim]"
console.print(f" - {p} ({status})")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
@@ -227,7 +219,7 @@ def config_path(
if config_file is None:
console.print("[yellow]No config file found.[/yellow]")
console.print("\nSearched locations:")
for p in _CONFIG_PATHS:
for p in config_search_paths():
status = "[green]exists[/green]" if p.exists() else "[dim]not found[/dim]"
console.print(f" - {p} ({status})")
raise typer.Exit(1)
@@ -247,6 +239,9 @@ def config_validate(
err_console.print("[red]✗[/] No config file found")
raise typer.Exit(1)
# Lazy import: pydantic adds ~50ms to startup, only load when actually needed
from compose_farm.config import load_config # noqa: PLC0415
try:
cfg = load_config(config_file)
except FileNotFoundError as e:
@@ -261,5 +256,68 @@ def config_validate(
console.print(f" Services: {len(cfg.services)}")
@config_app.command("symlink")
def config_symlink(
target: Annotated[
Path | None,
typer.Argument(help="Config file to link to. Defaults to ./compose-farm.yaml"),
] = None,
force: _ForceOption = False,
) -> None:
"""Create a symlink from the default config location to a config file.
This makes a local config file discoverable globally without copying.
Always uses absolute paths to avoid broken symlinks.
Examples:
cf config symlink # Link to ./compose-farm.yaml
cf config symlink /opt/compose/config.yaml # Link to specific file
"""
# Default to compose-farm.yaml in current directory
target_path = (target or Path("compose-farm.yaml")).expanduser().resolve()
if not target_path.exists():
err_console.print(f"[red]✗[/] Target config file not found: {target_path}")
raise typer.Exit(1)
if not target_path.is_file():
err_console.print(f"[red]✗[/] Target is not a file: {target_path}")
raise typer.Exit(1)
symlink_path = default_config_path()
# Check if symlink location already exists
if symlink_path.exists() or symlink_path.is_symlink():
if symlink_path.is_symlink():
current_target = symlink_path.resolve() if symlink_path.exists() else None
if current_target == target_path:
console.print(f"[green]✓[/] Symlink already points to: {target_path}")
return
# Update existing symlink
if not force:
existing = symlink_path.readlink()
console.print(f"[yellow]Symlink exists:[/] {symlink_path} -> {existing}")
if not typer.confirm(f"Update to point to {target_path}?"):
console.print("[dim]Aborted.[/dim]")
raise typer.Exit(0)
symlink_path.unlink()
else:
# Regular file exists
err_console.print(f"[red]✗[/] A regular file exists at: {symlink_path}")
err_console.print(" Back it up or remove it first, then retry.")
raise typer.Exit(1)
# Create parent directories
symlink_path.parent.mkdir(parents=True, exist_ok=True)
# Create symlink with absolute path
symlink_path.symlink_to(target_path)
console.print("[green]✓[/] Created symlink:")
console.print(f" {symlink_path}")
console.print(f" -> {target_path}")
# Register config subcommand on the shared app
app.add_typer(config_app, name="config", rich_help_panel="Configuration")

View File

@@ -1,11 +1,14 @@
"""Lifecycle commands: up, down, pull, restart, update."""
"""Lifecycle commands: up, down, pull, restart, update, apply."""
from __future__ import annotations
from typing import Annotated
from typing import TYPE_CHECKING, Annotated
import typer
if TYPE_CHECKING:
from compose_farm.config import Config
from compose_farm.cli.app import app
from compose_farm.cli.common import (
AllOption,
@@ -19,12 +22,15 @@ from compose_farm.cli.common import (
run_async,
run_host_operation,
)
from compose_farm.console import console
from compose_farm.console import console, err_console
from compose_farm.executor import run_on_services, run_sequential_on_services
from compose_farm.operations import up_services
from compose_farm.operations import stop_orphaned_services, up_services
from compose_farm.state import (
add_service_to_host,
get_orphaned_services,
get_service_host,
get_services_needing_migration,
get_services_not_in_state,
remove_service,
remove_service_from_host,
)
@@ -34,28 +40,11 @@ from compose_farm.state import (
def up(
services: ServicesArg = None,
all_services: AllOption = False,
migrate: Annotated[
bool, typer.Option("--migrate", "-m", help="Only services needing migration")
] = False,
host: HostOption = None,
config: ConfigOption = None,
) -> None:
"""Start services (docker compose up -d). Auto-migrates if host changed."""
from compose_farm.console import err_console # noqa: PLC0415
if migrate and host:
err_console.print("[red]✗[/] Cannot use --migrate and --host together")
raise typer.Exit(1)
if migrate:
cfg = load_config_or_exit(config)
svc_list = get_services_needing_migration(cfg)
if not svc_list:
console.print("[green]✓[/] No services need migration")
return
console.print(f"[cyan]Migrating {len(svc_list)} service(s):[/] {', '.join(svc_list)}")
else:
svc_list, cfg = get_services(services or [], all_services, config)
svc_list, cfg = get_services(services or [], all_services, config)
# Per-host operation: run on specific host only
if host:
@@ -64,7 +53,7 @@ def up(
# Normal operation: use up_services with migration logic
results = run_async(up_services(cfg, svc_list, raw=True))
maybe_regenerate_traefik(cfg)
maybe_regenerate_traefik(cfg, results)
report_results(results)
@@ -72,10 +61,37 @@ def up(
def down(
services: ServicesArg = None,
all_services: AllOption = False,
orphaned: Annotated[
bool,
typer.Option(
"--orphaned", help="Stop orphaned services (in state but removed from config)"
),
] = False,
host: HostOption = None,
config: ConfigOption = None,
) -> None:
"""Stop services (docker compose down)."""
# Handle --orphaned flag
if orphaned:
if services or all_services or host:
err_console.print("[red]✗[/] Cannot use --orphaned with services, --all, or --host")
raise typer.Exit(1)
cfg = load_config_or_exit(config)
orphaned_services = get_orphaned_services(cfg)
if not orphaned_services:
console.print("[green]✓[/] No orphaned services to stop")
return
console.print(
f"[yellow]Stopping {len(orphaned_services)} orphaned service(s):[/] "
f"{', '.join(orphaned_services.keys())}"
)
results = run_async(stop_orphaned_services(cfg))
report_results(results)
return
svc_list, cfg = get_services(services or [], all_services, config)
# Per-host operation: run on specific host only
@@ -97,7 +113,7 @@ def down(
remove_service(cfg, base_service)
removed_services.add(base_service)
maybe_regenerate_traefik(cfg)
maybe_regenerate_traefik(cfg, results)
report_results(results)
@@ -124,7 +140,7 @@ def restart(
svc_list, cfg = get_services(services or [], all_services, config)
raw = len(svc_list) == 1
results = run_async(run_sequential_on_services(cfg, svc_list, ["down", "up -d"], raw=raw))
maybe_regenerate_traefik(cfg)
maybe_regenerate_traefik(cfg, results)
report_results(results)
@@ -134,11 +150,150 @@ def update(
all_services: AllOption = False,
config: ConfigOption = None,
) -> None:
"""Update services (pull + down + up)."""
"""Update services (pull + build + down + up)."""
svc_list, cfg = get_services(services or [], all_services, config)
raw = len(svc_list) == 1
results = run_async(
run_sequential_on_services(cfg, svc_list, ["pull", "down", "up -d"], raw=raw)
run_sequential_on_services(
cfg, svc_list, ["pull --ignore-buildable", "build", "down", "up -d"], raw=raw
)
)
maybe_regenerate_traefik(cfg)
maybe_regenerate_traefik(cfg, results)
report_results(results)
def _format_host(host: str | list[str]) -> str:
"""Format a host value for display."""
if isinstance(host, list):
return ", ".join(host)
return host
def _report_pending_migrations(cfg: Config, migrations: list[str]) -> None:
"""Report services that need migration."""
console.print(f"[cyan]Services to migrate ({len(migrations)}):[/]")
for svc in migrations:
current = get_service_host(cfg, svc)
target = cfg.get_hosts(svc)[0]
console.print(f" [cyan]{svc}[/]: [magenta]{current}[/] → [magenta]{target}[/]")
def _report_pending_orphans(orphaned: dict[str, str | list[str]]) -> None:
"""Report orphaned services that will be stopped."""
console.print(f"[yellow]Orphaned services to stop ({len(orphaned)}):[/]")
for svc, hosts in orphaned.items():
console.print(f" [cyan]{svc}[/] on [magenta]{_format_host(hosts)}[/]")
def _report_pending_starts(cfg: Config, missing: list[str]) -> None:
"""Report services that will be started."""
console.print(f"[green]Services to start ({len(missing)}):[/]")
for svc in missing:
target = _format_host(cfg.get_hosts(svc))
console.print(f" [cyan]{svc}[/] on [magenta]{target}[/]")
def _report_pending_refresh(cfg: Config, to_refresh: list[str]) -> None:
"""Report services that will be refreshed."""
console.print(f"[blue]Services to refresh ({len(to_refresh)}):[/]")
for svc in to_refresh:
target = _format_host(cfg.get_hosts(svc))
console.print(f" [cyan]{svc}[/] on [magenta]{target}[/]")
@app.command(rich_help_panel="Lifecycle")
def apply(
dry_run: Annotated[
bool,
typer.Option("--dry-run", "-n", help="Show what would change without executing"),
] = False,
no_orphans: Annotated[
bool,
typer.Option("--no-orphans", help="Only migrate, don't stop orphaned services"),
] = False,
full: Annotated[
bool,
typer.Option("--full", "-f", help="Also run up on all services to apply config changes"),
] = False,
config: ConfigOption = None,
) -> None:
"""Make reality match config (start, migrate, stop as needed).
This is the "reconcile" command that ensures running services match your
config file. It will:
1. Stop orphaned services (in state but removed from config)
2. Migrate services on wrong host (host in state ≠ host in config)
3. Start missing services (in config but not in state)
Use --dry-run to preview changes before applying.
Use --no-orphans to only migrate/start without stopping orphaned services.
Use --full to also run 'up' on all services (picks up compose/env changes).
"""
cfg = load_config_or_exit(config)
orphaned = get_orphaned_services(cfg)
migrations = get_services_needing_migration(cfg)
missing = get_services_not_in_state(cfg)
# For --full: refresh all services not already being started/migrated
handled = set(migrations) | set(missing)
to_refresh = [svc for svc in cfg.services if svc not in handled] if full else []
has_orphans = bool(orphaned) and not no_orphans
has_migrations = bool(migrations)
has_missing = bool(missing)
has_refresh = bool(to_refresh)
if not has_orphans and not has_migrations and not has_missing and not has_refresh:
console.print("[green]✓[/] Nothing to apply - reality matches config")
return
# Report what will be done
if has_orphans:
_report_pending_orphans(orphaned)
if has_migrations:
_report_pending_migrations(cfg, migrations)
if has_missing:
_report_pending_starts(cfg, missing)
if has_refresh:
_report_pending_refresh(cfg, to_refresh)
if dry_run:
console.print("\n[dim](dry-run: no changes made)[/]")
return
# Execute changes
console.print()
all_results = []
# 1. Stop orphaned services first
if has_orphans:
console.print("[yellow]Stopping orphaned services...[/]")
all_results.extend(run_async(stop_orphaned_services(cfg)))
# 2. Migrate services on wrong host
if has_migrations:
console.print("[cyan]Migrating services...[/]")
migrate_results = run_async(up_services(cfg, migrations, raw=True))
all_results.extend(migrate_results)
maybe_regenerate_traefik(cfg, migrate_results)
# 3. Start missing services (reuse up_services which handles state updates)
if has_missing:
console.print("[green]Starting missing services...[/]")
start_results = run_async(up_services(cfg, missing, raw=True))
all_results.extend(start_results)
maybe_regenerate_traefik(cfg, start_results)
# 4. Refresh remaining services (--full: run up to apply config changes)
if has_refresh:
console.print("[blue]Refreshing services...[/]")
refresh_results = run_async(up_services(cfg, to_refresh, raw=True))
all_results.extend(refresh_results)
maybe_regenerate_traefik(cfg, refresh_results)
report_results(all_results)
# Alias: cf a = cf apply
app.command("a", hidden=True)(apply)

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import asyncio
from datetime import UTC, datetime
from pathlib import Path # noqa: TC003
from typing import Annotated
from typing import TYPE_CHECKING, Annotated
import typer
from rich.progress import Progress, TaskID # noqa: TC002
@@ -22,13 +22,13 @@ from compose_farm.cli.common import (
progress_bar,
run_async,
)
from compose_farm.compose import parse_external_networks
from compose_farm.config import Config # noqa: TC001
if TYPE_CHECKING:
from compose_farm.config import Config
from compose_farm.console import console, err_console
from compose_farm.executor import (
CommandResult,
check_networks_exist,
check_paths_exist,
check_service_running,
is_local,
run_command,
@@ -42,8 +42,8 @@ from compose_farm.logs import (
merge_entries,
write_toml,
)
from compose_farm.operations import check_host_compatibility, get_service_paths
from compose_farm.state import load_state, save_state
from compose_farm.operations import check_host_compatibility, check_service_requirements
from compose_farm.state import get_orphaned_services, load_state, save_state
from compose_farm.traefik import generate_traefik_config, render_traefik_config
# --- Sync helpers ---
@@ -216,59 +216,58 @@ def _check_ssh_connectivity(cfg: Config) -> list[str]:
return asyncio.run(gather_with_progress(progress, task_id))
def _check_mounts_and_networks(
def _check_service_requirements(
cfg: Config,
services: list[str],
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
"""Check mounts and networks for all services with a progress bar.
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]], list[tuple[str, str, str]]]:
"""Check mounts, networks, and devices for all services with a progress bar.
Returns (mount_errors, network_errors) where each is a list of
Returns (mount_errors, network_errors, device_errors) where each is a list of
(service, host, missing_item) tuples.
"""
async def check_service(
service: str,
) -> tuple[str, list[tuple[str, str, str]], list[tuple[str, str, str]]]:
"""Check mounts and networks for a single service."""
) -> tuple[
str,
list[tuple[str, str, str]],
list[tuple[str, str, str]],
list[tuple[str, str, str]],
]:
"""Check requirements for a single service on all its hosts."""
host_names = cfg.get_hosts(service)
mount_errors: list[tuple[str, str, str]] = []
network_errors: list[tuple[str, str, str]] = []
device_errors: list[tuple[str, str, str]] = []
# Check mounts on all hosts
paths = get_service_paths(cfg, service)
for host_name in host_names:
path_exists = await check_paths_exist(cfg, host_name, paths)
for path, found in path_exists.items():
if not found:
mount_errors.append((service, host_name, path))
missing_paths, missing_nets, missing_devs = await check_service_requirements(
cfg, service, host_name
)
mount_errors.extend((service, host_name, p) for p in missing_paths)
network_errors.extend((service, host_name, n) for n in missing_nets)
device_errors.extend((service, host_name, d) for d in missing_devs)
# Check networks on all hosts
networks = parse_external_networks(cfg, service)
if networks:
for host_name in host_names:
net_exists = await check_networks_exist(cfg, host_name, networks)
for net, found in net_exists.items():
if not found:
network_errors.append((service, host_name, net))
return service, mount_errors, network_errors
return service, mount_errors, network_errors, device_errors
async def gather_with_progress(
progress: Progress, task_id: TaskID
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]], list[tuple[str, str, str]]]:
tasks = [asyncio.create_task(check_service(s)) for s in services]
all_mount_errors: list[tuple[str, str, str]] = []
all_network_errors: list[tuple[str, str, str]] = []
all_device_errors: list[tuple[str, str, str]] = []
for coro in asyncio.as_completed(tasks):
service, mount_errs, net_errs = await coro
service, mount_errs, net_errs, dev_errs = await coro
all_mount_errors.extend(mount_errs)
all_network_errors.extend(net_errs)
all_device_errors.extend(dev_errs)
progress.update(task_id, advance=1, description=f"[cyan]{service}[/]")
return all_mount_errors, all_network_errors
return all_mount_errors, all_network_errors, all_device_errors
with progress_bar("Checking mounts/networks", len(services)) as (progress, task_id):
with progress_bar("Checking requirements", len(services)) as (progress, task_id):
return asyncio.run(gather_with_progress(progress, task_id))
@@ -276,12 +275,12 @@ def _report_config_status(cfg: Config) -> bool:
"""Check and report config vs disk status. Returns True if errors found."""
configured = set(cfg.services.keys())
on_disk = cfg.discover_compose_dirs()
missing_from_config = sorted(on_disk - configured)
unmanaged = sorted(on_disk - configured)
missing_from_disk = sorted(configured - on_disk)
if missing_from_config:
console.print(f"\n[yellow]On disk but not in config[/] ({len(missing_from_config)}):")
for name in missing_from_config:
if unmanaged:
console.print(f"\n[yellow]Unmanaged[/] (on disk but not in config, {len(unmanaged)}):")
for name in unmanaged:
console.print(f" [yellow]+[/] [cyan]{name}[/]")
if missing_from_disk:
@@ -289,7 +288,7 @@ def _report_config_status(cfg: Config) -> bool:
for name in missing_from_disk:
console.print(f" [red]-[/] [cyan]{name}[/]")
if not missing_from_config and not missing_from_disk:
if not unmanaged and not missing_from_disk:
console.print("[green]✓[/] Config matches disk")
return bool(missing_from_disk)
@@ -297,17 +296,15 @@ def _report_config_status(cfg: Config) -> bool:
def _report_orphaned_services(cfg: Config) -> bool:
"""Check for services in state but not in config. Returns True if orphans found."""
state = load_state(cfg)
configured = set(cfg.services.keys())
tracked = set(state.keys())
orphaned = sorted(tracked - configured)
orphaned = get_orphaned_services(cfg)
if orphaned:
console.print("\n[yellow]Orphaned services[/] (in state but not in config):")
console.print("[dim]These may still be running. Use 'docker compose down' to stop them.[/]")
for name in orphaned:
host = state[name]
host_str = ", ".join(host) if isinstance(host, list) else host
console.print(
"[dim]Run 'cf apply' to stop them, or 'cf down --orphaned' for just orphans.[/]"
)
for name, hosts in sorted(orphaned.items()):
host_str = ", ".join(hosts) if isinstance(hosts, list) else hosts
console.print(f" [yellow]![/] [cyan]{name}[/] on [magenta]{host_str}[/]")
return True
@@ -359,6 +356,21 @@ def _report_network_errors(network_errors: list[tuple[str, str, str]]) -> None:
console.print(f" [red]✗[/] {net}")
def _report_device_errors(device_errors: list[tuple[str, str, str]]) -> None:
"""Report device errors grouped by service."""
by_service: dict[str, list[tuple[str, str]]] = {}
for svc, host, dev in device_errors:
by_service.setdefault(svc, []).append((host, dev))
console.print(f"[red]Missing devices[/] ({len(device_errors)}):")
for svc, items in sorted(by_service.items()):
host = items[0][0]
devices = [d for _, d in items]
console.print(f" [cyan]{svc}[/] on [magenta]{host}[/]:")
for dev in devices:
console.print(f" [red]✗[/] {dev}")
def _report_ssh_status(unreachable_hosts: list[str]) -> bool:
"""Report SSH connectivity status. Returns True if there are errors."""
if unreachable_hosts:
@@ -404,8 +416,8 @@ def _run_remote_checks(cfg: Config, svc_list: list[str], *, show_host_compat: bo
console.print() # Spacing before mounts/networks check
# Check mounts and networks
mount_errors, network_errors = _check_mounts_and_networks(cfg, svc_list)
# Check mounts, networks, and devices
mount_errors, network_errors, device_errors = _check_service_requirements(cfg, svc_list)
if mount_errors:
_report_mount_errors(mount_errors)
@@ -413,8 +425,11 @@ def _run_remote_checks(cfg: Config, svc_list: list[str], *, show_host_compat: bo
if network_errors:
_report_network_errors(network_errors)
has_errors = True
if not mount_errors and not network_errors:
console.print("[green]✓[/] All mounts and networks exist")
if device_errors:
_report_device_errors(device_errors)
has_errors = True
if not mount_errors and not network_errors and not device_errors:
console.print("[green]✓[/] All mounts, networks, and devices exist")
if show_host_compat:
for service in svc_list:
@@ -468,19 +483,21 @@ def traefik_file(
@app.command(rich_help_panel="Configuration")
def sync(
def refresh(
config: ConfigOption = None,
log_path: LogPathOption = None,
dry_run: Annotated[
bool,
typer.Option("--dry-run", "-n", help="Show what would be synced without writing"),
typer.Option("--dry-run", "-n", help="Show what would change without writing"),
] = False,
) -> None:
"""Sync local state with running services.
"""Update local state from running services.
Discovers which services are running on which hosts, updates the state
file, and captures image digests. Combines service discovery with
image snapshot into a single command.
file, and captures image digests. This is a read operation - it updates
your local state to match reality, not the other way around.
Use 'cf apply' to make reality match your config (stop orphans, migrate).
"""
cfg = load_config_or_exit(config)
current_state = load_state(cfg)

View File

@@ -23,7 +23,6 @@ from compose_farm.cli.common import (
report_results,
run_async,
)
from compose_farm.config import Config # noqa: TC001
from compose_farm.console import console, err_console
from compose_farm.executor import run_command, run_on_services
from compose_farm.state import get_services_needing_migration, load_state
@@ -31,6 +30,8 @@ from compose_farm.state import get_services_needing_migration, load_state
if TYPE_CHECKING:
from collections.abc import Mapping
from compose_farm.config import Config
def _group_services_by_host(
services: dict[str, str | list[str]],

View File

@@ -203,6 +203,51 @@ def parse_host_volumes(config: Config, service: str) -> list[str]:
return unique
def parse_devices(config: Config, service: str) -> list[str]:
"""Extract host device paths from a service's compose file.
Returns a list of host device paths (e.g., /dev/dri, /dev/dri/renderD128).
"""
compose_path = config.get_compose_path(service)
if not compose_path.exists():
return []
env = _load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
return []
devices: list[str] = []
for definition in raw_services.values():
if not isinstance(definition, dict):
continue
device_list = definition.get("devices")
if not device_list or not isinstance(device_list, list):
continue
for item in device_list:
if not isinstance(item, str):
continue
interpolated = _interpolate(item, env)
# Format: host_path:container_path[:options]
parts = interpolated.split(":")
if parts:
host_path = parts[0]
if host_path.startswith("/dev/"):
devices.append(host_path)
# Return unique devices, preserving order
seen: set[str] = set()
unique: list[str] = []
for d in devices:
if d not in seen:
seen.add(d)
unique.append(d)
return unique
def parse_external_networks(config: Config, service: str) -> list[str]:
"""Extract external network names from a service's compose file.

View File

@@ -9,10 +9,7 @@ from pathlib import Path
import yaml
from pydantic import BaseModel, Field, model_validator
def xdg_config_home() -> Path:
"""Get XDG config directory, respecting XDG_CONFIG_HOME env var."""
return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config"))
from .paths import xdg_config_home
class Host(BaseModel):

View File

@@ -9,7 +9,6 @@ from dataclasses import dataclass
from functools import lru_cache
from typing import TYPE_CHECKING, Any
import asyncssh
from rich.markup import escape
from .console import console, err_console
@@ -53,6 +52,15 @@ class CommandResult:
stdout: str = ""
stderr: str = ""
# SSH returns 255 when connection is closed unexpectedly (e.g., Ctrl+C)
_SSH_CONNECTION_CLOSED = 255
@property
def interrupted(self) -> bool:
"""Check if command was killed by SIGINT (Ctrl+C)."""
# Negative exit codes indicate signal termination; -2 = SIGINT
return self.exit_code < 0 or self.exit_code == self._SSH_CONNECTION_CLOSED
def is_local(host: Host) -> bool:
"""Check if host should run locally (no SSH)."""
@@ -156,6 +164,8 @@ async def _run_ssh_command(
success=result.returncode == 0,
)
import asyncssh # noqa: PLC0415 - lazy import for faster CLI startup
proc: asyncssh.SSHClientProcess[Any]
try:
async with asyncssh.connect( # noqa: SIM117 - conn needed before create_process
@@ -418,12 +428,15 @@ async def check_paths_exist(
"""Check if multiple paths exist on a specific host.
Returns a dict mapping path -> exists.
Handles permission denied as "exists" (path is there, just not accessible).
"""
# Only report missing if stat says "No such file", otherwise assume exists
# (handles permission denied correctly - path exists, just not accessible)
return await _batch_check_existence(
config,
host_name,
paths,
lambda esc: f"test -e '{esc}' && echo 'Y:{esc}' || echo 'N:{esc}'",
lambda esc: f"stat '{esc}' 2>&1 | grep -q 'No such file' && echo 'N:{esc}' || echo 'Y:{esc}'",
"mount-check",
)

View File

@@ -8,8 +8,8 @@ from dataclasses import dataclass
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from .config import xdg_config_home
from .executor import run_compose
from .paths import xdg_config_home
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable

View File

@@ -6,24 +6,69 @@ CLI commands are thin wrappers around these functions.
from __future__ import annotations
from typing import TYPE_CHECKING
import asyncio
from typing import TYPE_CHECKING, NamedTuple
from .compose import parse_external_networks, parse_host_volumes
from .compose import parse_devices, parse_external_networks, parse_host_volumes
from .console import console, err_console
from .executor import (
CommandResult,
check_networks_exist,
check_paths_exist,
check_service_running,
run_command,
run_compose,
run_compose_on_host,
)
from .state import get_service_host, set_multi_host_service, set_service_host
from .state import (
get_orphaned_services,
get_service_host,
remove_service,
set_multi_host_service,
set_service_host,
)
if TYPE_CHECKING:
from .config import Config
class OperationInterruptedError(Exception):
"""Raised when a command is interrupted by Ctrl+C."""
class PreflightResult(NamedTuple):
"""Result of pre-flight checks for a service on a host."""
missing_paths: list[str]
missing_networks: list[str]
missing_devices: list[str]
@property
def ok(self) -> bool:
"""Return True if all checks passed."""
return not (self.missing_paths or self.missing_networks or self.missing_devices)
async def _run_compose_step(
cfg: Config,
service: str,
command: str,
*,
raw: bool,
host: str | None = None,
) -> CommandResult:
"""Run a compose command, handle raw output newline, and check for interrupts."""
if host:
result = await run_compose_on_host(cfg, service, host, command, raw=raw)
else:
result = await run_compose(cfg, service, command, raw=raw)
if raw:
print() # Ensure newline after raw output
if result.interrupted:
raise OperationInterruptedError
return result
def get_service_paths(cfg: Config, service: str) -> list[str]:
"""Get all required paths for a service (compose_dir + volumes)."""
paths = [str(cfg.compose_dir)]
@@ -31,58 +76,84 @@ def get_service_paths(cfg: Config, service: str) -> list[str]:
return paths
async def _check_mounts_for_migration(
async def check_service_requirements(
cfg: Config,
service: str,
target_host: str,
) -> list[str]:
"""Check if mount paths exist on target host. Returns list of missing paths."""
paths = get_service_paths(cfg, service)
exists = await check_paths_exist(cfg, target_host, paths)
return [p for p, found in exists.items() if not found]
host_name: str,
) -> PreflightResult:
"""Check if a service can run on a specific host.
async def _check_networks_for_migration(
cfg: Config,
service: str,
target_host: str,
) -> list[str]:
"""Check if Docker networks exist on target host. Returns list of missing networks."""
networks = parse_external_networks(cfg, service)
if not networks:
return []
exists = await check_networks_exist(cfg, target_host, networks)
return [n for n, found in exists.items() if not found]
async def _preflight_check(
cfg: Config,
service: str,
target_host: str,
) -> tuple[list[str], list[str]]:
"""Run pre-flight checks for a service on target host.
Returns (missing_paths, missing_networks).
Verifies that all required paths (volumes), networks, and devices exist.
"""
missing_paths = await _check_mounts_for_migration(cfg, service, target_host)
missing_networks = await _check_networks_for_migration(cfg, service, target_host)
return missing_paths, missing_networks
# Check mount paths
paths = get_service_paths(cfg, service)
path_exists = await check_paths_exist(cfg, host_name, paths)
missing_paths = [p for p, found in path_exists.items() if not found]
# Check external networks
networks = parse_external_networks(cfg, service)
missing_networks: list[str] = []
if networks:
net_exists = await check_networks_exist(cfg, host_name, networks)
missing_networks = [n for n, found in net_exists.items() if not found]
# Check devices
devices = parse_devices(cfg, service)
missing_devices: list[str] = []
if devices:
dev_exists = await check_paths_exist(cfg, host_name, devices)
missing_devices = [d for d, found in dev_exists.items() if not found]
return PreflightResult(missing_paths, missing_networks, missing_devices)
async def _cleanup_and_rollback(
cfg: Config,
service: str,
target_host: str,
current_host: str,
prefix: str,
*,
was_running: bool,
raw: bool = False,
) -> None:
"""Clean up failed start and attempt rollback to old host if it was running."""
err_console.print(
f"{prefix} [yellow]![/] Cleaning up failed start on [magenta]{target_host}[/]"
)
await run_compose(cfg, service, "down", raw=raw)
if not was_running:
err_console.print(
f"{prefix} [dim]Service was not running on [magenta]{current_host}[/], skipping rollback[/]"
)
return
err_console.print(f"{prefix} [yellow]![/] Rolling back to [magenta]{current_host}[/]...")
rollback_result = await run_compose_on_host(cfg, service, current_host, "up -d", raw=raw)
if rollback_result.success:
console.print(f"{prefix} [green]✓[/] Rollback succeeded on [magenta]{current_host}[/]")
else:
err_console.print(f"{prefix} [red]✗[/] Rollback failed - service is down")
def _report_preflight_failures(
service: str,
target_host: str,
missing_paths: list[str],
missing_networks: list[str],
preflight: PreflightResult,
) -> None:
"""Report pre-flight check failures."""
err_console.print(
f"[cyan]\\[{service}][/] [red]✗[/] Cannot start on [magenta]{target_host}[/]:"
)
for path in missing_paths:
for path in preflight.missing_paths:
err_console.print(f" [red]✗[/] missing path: {path}")
for net in missing_networks:
for net in preflight.missing_networks:
err_console.print(f" [red]✗[/] missing network: {net}")
if preflight.missing_networks:
err_console.print(f" [dim]hint: cf init-network {target_host}[/]")
for dev in preflight.missing_devices:
err_console.print(f" [red]✗[/] missing device: {dev}")
async def _up_multi_host_service(
@@ -100,9 +171,9 @@ async def _up_multi_host_service(
# Pre-flight checks on all hosts
for host_name in host_names:
missing_paths, missing_networks = await _preflight_check(cfg, service, host_name)
if missing_paths or missing_networks:
_report_preflight_failures(service, host_name, missing_paths, missing_networks)
preflight = await check_service_requirements(cfg, service, host_name)
if not preflight.ok:
_report_preflight_failures(service, host_name, preflight)
results.append(
CommandResult(service=f"{service}@{host_name}", exit_code=1, success=False)
)
@@ -147,33 +218,78 @@ async def _migrate_service(
console.print(
f"{prefix} Migrating from [magenta]{current_host}[/] → [magenta]{target_host}[/]..."
)
# Prepare images on target host before stopping old service to minimize downtime.
# Pull handles image-based services; build handles Dockerfile-based services.
# Each command is a no-op for the other type (exit 0, no work done).
pull_result = await run_compose(cfg, service, "pull", raw=raw)
if raw:
print() # Ensure newline after raw output
if not pull_result.success:
err_console.print(
f"{prefix} [red]✗[/] Pull failed on [magenta]{target_host}[/], "
"leaving service on current host"
# --ignore-buildable makes pull skip images that have build: defined.
for cmd, label in [("pull --ignore-buildable", "Pull"), ("build", "Build")]:
result = await _run_compose_step(cfg, service, cmd, raw=raw)
if not result.success:
err_console.print(
f"{prefix} [red]✗[/] {label} failed on [magenta]{target_host}[/], "
"leaving service on current host"
)
return result
# Stop on current host
down_result = await _run_compose_step(cfg, service, "down", raw=raw, host=current_host)
return down_result if not down_result.success else None
async def _up_single_service(
cfg: Config,
service: str,
prefix: str,
*,
raw: bool,
) -> CommandResult:
"""Start a single-host service with migration support."""
target_host = cfg.get_hosts(service)[0]
current_host = get_service_host(cfg, service)
# Pre-flight check: verify paths, networks, and devices exist on target
preflight = await check_service_requirements(cfg, service, target_host)
if not preflight.ok:
_report_preflight_failures(service, target_host, preflight)
return CommandResult(service=service, exit_code=1, success=False)
# If service is deployed elsewhere, migrate it
did_migration = False
was_running = False
if current_host and current_host != target_host:
if current_host in cfg.hosts:
was_running = await check_service_running(cfg, service, current_host)
failure = await _migrate_service(
cfg, service, current_host, target_host, prefix, raw=raw
)
if failure:
return failure
did_migration = True
else:
err_console.print(
f"{prefix} [yellow]![/] was on "
f"[magenta]{current_host}[/] (not in config), skipping down"
)
# Start on target host
console.print(f"{prefix} Starting on [magenta]{target_host}[/]...")
up_result = await _run_compose_step(cfg, service, "up -d", raw=raw)
# Update state on success, or rollback on failure
if up_result.success:
set_service_host(cfg, service, target_host)
elif did_migration and current_host:
await _cleanup_and_rollback(
cfg,
service,
target_host,
current_host,
prefix,
was_running=was_running,
raw=raw,
)
return pull_result
build_result = await run_compose(cfg, service, "build", raw=raw)
if raw:
print() # Ensure newline after raw output
if not build_result.success:
err_console.print(
f"{prefix} [red]✗[/] Build failed on [magenta]{target_host}[/], "
"leaving service on current host"
)
return build_result
down_result = await run_compose_on_host(cfg, service, current_host, "down", raw=raw)
if raw:
print() # Ensure newline after raw output
if not down_result.success:
return down_result
return None
return up_result
async def up_services(
@@ -186,50 +302,16 @@ async def up_services(
results: list[CommandResult] = []
total = len(services)
for idx, service in enumerate(services, 1):
prefix = f"[dim][{idx}/{total}][/] [cyan]\\[{service}][/]"
try:
for idx, service in enumerate(services, 1):
prefix = f"[dim][{idx}/{total}][/] [cyan]\\[{service}][/]"
# Handle multi-host services separately (no migration)
if cfg.is_multi_host(service):
multi_results = await _up_multi_host_service(cfg, service, prefix, raw=raw)
results.extend(multi_results)
continue
target_host = cfg.get_hosts(service)[0]
current_host = get_service_host(cfg, service)
# Pre-flight check: verify paths and networks exist on target
missing_paths, missing_networks = await _preflight_check(cfg, service, target_host)
if missing_paths or missing_networks:
_report_preflight_failures(service, target_host, missing_paths, missing_networks)
results.append(CommandResult(service=service, exit_code=1, success=False))
continue
# If service is deployed elsewhere, migrate it
if current_host and current_host != target_host:
if current_host in cfg.hosts:
failure = await _migrate_service(
cfg, service, current_host, target_host, prefix, raw=raw
)
if failure:
results.append(failure)
continue
if cfg.is_multi_host(service):
results.extend(await _up_multi_host_service(cfg, service, prefix, raw=raw))
else:
err_console.print(
f"{prefix} [yellow]![/] was on "
f"[magenta]{current_host}[/] (not in config), skipping down"
)
# Start on target host
console.print(f"{prefix} Starting on [magenta]{target_host}[/]...")
up_result = await run_compose(cfg, service, "up -d", raw=raw)
if raw:
print() # Ensure newline after raw output (progress bars end with \r)
results.append(up_result)
# Update state on success
if up_result.success:
set_service_host(cfg, service, target_host)
results.append(await _up_single_service(cfg, service, prefix, raw=raw))
except OperationInterruptedError:
raise KeyboardInterrupt from None
return results
@@ -238,17 +320,95 @@ async def check_host_compatibility(
cfg: Config,
service: str,
) -> dict[str, tuple[int, int, list[str]]]:
"""Check which hosts can run a service based on mount paths.
"""Check which hosts can run a service based on paths, networks, and devices.
Returns dict of host_name -> (found_count, total_count, missing_paths).
Returns dict of host_name -> (found_count, total_count, missing_items).
"""
# Get total requirements count
paths = get_service_paths(cfg, service)
networks = parse_external_networks(cfg, service)
devices = parse_devices(cfg, service)
total = len(paths) + len(networks) + len(devices)
results: dict[str, tuple[int, int, list[str]]] = {}
for host_name in cfg.hosts:
exists = await check_paths_exist(cfg, host_name, paths)
found = sum(1 for v in exists.values() if v)
missing = [p for p, v in exists.items() if not v]
results[host_name] = (found, len(paths), missing)
preflight = await check_service_requirements(cfg, service, host_name)
all_missing = (
preflight.missing_paths + preflight.missing_networks + preflight.missing_devices
)
found = total - len(all_missing)
results[host_name] = (found, total, all_missing)
return results
async def stop_orphaned_services(cfg: Config) -> list[CommandResult]:
"""Stop orphaned services (in state but not in config).
Runs docker compose down on each service on its tracked host(s).
Only removes from state on successful stop.
Returns list of CommandResults for each service@host.
"""
orphaned = get_orphaned_services(cfg)
if not orphaned:
return []
results: list[CommandResult] = []
tasks: list[tuple[str, str, asyncio.Task[CommandResult]]] = []
# Build list of (service, host, task) for all orphaned services
for service, hosts in orphaned.items():
host_list = hosts if isinstance(hosts, list) else [hosts]
for host in host_list:
# Skip hosts no longer in config
if host not in cfg.hosts:
console.print(
f" [yellow]![/] {service}@{host}: host no longer in config, skipping"
)
results.append(
CommandResult(
service=f"{service}@{host}",
exit_code=1,
success=False,
stderr="host no longer in config",
)
)
continue
coro = run_compose_on_host(cfg, service, host, "down")
tasks.append((service, host, asyncio.create_task(coro)))
# Run all down commands in parallel
if tasks:
for service, host, task in tasks:
try:
result = await task
results.append(result)
if result.success:
console.print(f" [green]✓[/] {service}@{host}: stopped")
else:
console.print(f" [red]✗[/] {service}@{host}: {result.stderr or 'failed'}")
except Exception as e:
console.print(f" [red]✗[/] {service}@{host}: {e}")
results.append(
CommandResult(
service=f"{service}@{host}",
exit_code=1,
success=False,
stderr=str(e),
)
)
# Remove from state only for services where ALL hosts succeeded
for service, hosts in orphaned.items():
host_list = hosts if isinstance(hosts, list) else [hosts]
all_succeeded = all(
r.success
for r in results
if r.service.startswith(f"{service}@") or r.service == service
)
if all_succeeded:
remove_service(cfg, service)
return results

21
src/compose_farm/paths.py Normal file
View File

@@ -0,0 +1,21 @@
"""Path utilities - lightweight module with no heavy dependencies."""
from __future__ import annotations
import os
from pathlib import Path
def xdg_config_home() -> Path:
"""Get XDG config directory, respecting XDG_CONFIG_HOME env var."""
return Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config"))
def default_config_path() -> Path:
"""Get the default user config path."""
return xdg_config_home() / "compose-farm" / "compose-farm.yaml"
def config_search_paths() -> list[Path]:
"""Get search paths for config files."""
return [Path("compose-farm.yaml"), default_config_path()]

View File

@@ -142,3 +142,25 @@ def get_services_needing_migration(config: Config) -> list[str]:
if current_host and current_host != configured_host:
needs_migration.append(service)
return needs_migration
def get_orphaned_services(config: Config) -> dict[str, str | list[str]]:
"""Get services that are in state but not in config.
These are services that were previously deployed but have been
removed from the config file (e.g., commented out).
Returns a dict mapping service name to host(s) where it's deployed.
"""
state = load_state(config)
return {service: hosts for service, hosts in state.items() if service not in config.services}
def get_services_not_in_state(config: Config) -> list[str]:
"""Get services that are in config but not in state.
These are services that should be running but aren't tracked
(e.g., newly added to config, or previously stopped as orphans).
"""
state = load_state(config)
return [service for service in config.services if service not in state]

426
tests/test_cli_lifecycle.py Normal file
View File

@@ -0,0 +1,426 @@
"""Tests for CLI lifecycle commands (apply, down --orphaned)."""
from pathlib import Path
from unittest.mock import patch
import pytest
import typer
from compose_farm.cli.lifecycle import apply, down
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
def _make_config(tmp_path: Path, services: dict[str, str] | None = None) -> Config:
"""Create a minimal config for testing."""
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
svc_dict = services or {"svc1": "host1", "svc2": "host2"}
for svc in svc_dict:
svc_dir = compose_dir / svc
svc_dir.mkdir()
(svc_dir / "docker-compose.yml").write_text("services: {}\n")
config_path = tmp_path / "compose-farm.yaml"
config_path.write_text("")
return Config(
compose_dir=compose_dir,
hosts={"host1": Host(address="localhost"), "host2": Host(address="localhost")},
services=svc_dict,
config_path=config_path,
)
def _make_result(service: str, success: bool = True) -> CommandResult:
"""Create a command result."""
return CommandResult(
service=service,
exit_code=0 if success else 1,
success=success,
stdout="",
stderr="",
)
class TestApplyCommand:
"""Tests for the apply command."""
def test_apply_nothing_to_do(self, tmp_path: Path, capsys: pytest.CaptureFixture[str]) -> None:
"""When no migrations, orphans, or missing services, prints success message."""
cfg = _make_config(tmp_path)
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.lifecycle.get_orphaned_services", return_value={}),
patch("compose_farm.cli.lifecycle.get_services_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_services_not_in_state", return_value=[]),
):
apply(dry_run=False, no_orphans=False, full=False, config=None)
captured = capsys.readouterr()
assert "Nothing to apply" in captured.out
def test_apply_dry_run_shows_preview(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
"""Dry run shows what would be done without executing."""
cfg = _make_config(tmp_path)
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch(
"compose_farm.cli.lifecycle.get_orphaned_services",
return_value={"old-svc": "host1"},
),
patch(
"compose_farm.cli.lifecycle.get_services_needing_migration",
return_value=["svc1"],
),
patch("compose_farm.cli.lifecycle.get_services_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle.get_service_host", return_value="host1"),
patch("compose_farm.cli.lifecycle.stop_orphaned_services") as mock_stop,
patch("compose_farm.cli.lifecycle.up_services") as mock_up,
):
apply(dry_run=True, no_orphans=False, full=False, config=None)
captured = capsys.readouterr()
assert "Services to migrate" in captured.out
assert "svc1" in captured.out
assert "Orphaned services to stop" in captured.out
assert "old-svc" in captured.out
assert "dry-run" in captured.out
# Should not have called the actual operations
mock_stop.assert_not_called()
mock_up.assert_not_called()
def test_apply_executes_migrations(self, tmp_path: Path) -> None:
"""Apply runs migrations when services need migration."""
cfg = _make_config(tmp_path)
mock_results = [_make_result("svc1")]
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.lifecycle.get_orphaned_services", return_value={}),
patch(
"compose_farm.cli.lifecycle.get_services_needing_migration",
return_value=["svc1"],
),
patch("compose_farm.cli.lifecycle.get_services_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle.get_service_host", return_value="host1"),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
),
patch("compose_farm.cli.lifecycle.up_services") as mock_up,
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=False, config=None)
mock_up.assert_called_once()
call_args = mock_up.call_args
assert call_args[0][1] == ["svc1"] # services list
def test_apply_executes_orphan_cleanup(self, tmp_path: Path) -> None:
"""Apply stops orphaned services."""
cfg = _make_config(tmp_path)
mock_results = [_make_result("old-svc@host1")]
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch(
"compose_farm.cli.lifecycle.get_orphaned_services",
return_value={"old-svc": "host1"},
),
patch("compose_farm.cli.lifecycle.get_services_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_services_not_in_state", return_value=[]),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
),
patch("compose_farm.cli.lifecycle.stop_orphaned_services") as mock_stop,
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=False, config=None)
mock_stop.assert_called_once_with(cfg)
def test_apply_no_orphans_skips_orphan_cleanup(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
"""--no-orphans flag skips orphan cleanup."""
cfg = _make_config(tmp_path)
mock_results = [_make_result("svc1")]
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch(
"compose_farm.cli.lifecycle.get_orphaned_services",
return_value={"old-svc": "host1"},
),
patch(
"compose_farm.cli.lifecycle.get_services_needing_migration",
return_value=["svc1"],
),
patch("compose_farm.cli.lifecycle.get_services_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle.get_service_host", return_value="host1"),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
),
patch("compose_farm.cli.lifecycle.up_services") as mock_up,
patch("compose_farm.cli.lifecycle.stop_orphaned_services") as mock_stop,
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=True, full=False, config=None)
# Should run migrations but not orphan cleanup
mock_up.assert_called_once()
mock_stop.assert_not_called()
# Orphans should not appear in output
captured = capsys.readouterr()
assert "old-svc" not in captured.out
def test_apply_no_orphans_nothing_to_do(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
"""--no-orphans with only orphans means nothing to do."""
cfg = _make_config(tmp_path)
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch(
"compose_farm.cli.lifecycle.get_orphaned_services",
return_value={"old-svc": "host1"},
),
patch("compose_farm.cli.lifecycle.get_services_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_services_not_in_state", return_value=[]),
):
apply(dry_run=False, no_orphans=True, full=False, config=None)
captured = capsys.readouterr()
assert "Nothing to apply" in captured.out
def test_apply_starts_missing_services(self, tmp_path: Path) -> None:
"""Apply starts services that are in config but not in state."""
cfg = _make_config(tmp_path)
mock_results = [_make_result("svc1")]
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.lifecycle.get_orphaned_services", return_value={}),
patch("compose_farm.cli.lifecycle.get_services_needing_migration", return_value=[]),
patch(
"compose_farm.cli.lifecycle.get_services_not_in_state",
return_value=["svc1"],
),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
),
patch("compose_farm.cli.lifecycle.up_services") as mock_up,
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=False, config=None)
mock_up.assert_called_once()
call_args = mock_up.call_args
assert call_args[0][1] == ["svc1"]
def test_apply_dry_run_shows_missing_services(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
"""Dry run shows services that would be started."""
cfg = _make_config(tmp_path)
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.lifecycle.get_orphaned_services", return_value={}),
patch("compose_farm.cli.lifecycle.get_services_needing_migration", return_value=[]),
patch(
"compose_farm.cli.lifecycle.get_services_not_in_state",
return_value=["svc1"],
),
):
apply(dry_run=True, no_orphans=False, full=False, config=None)
captured = capsys.readouterr()
assert "Services to start" in captured.out
assert "svc1" in captured.out
assert "dry-run" in captured.out
def test_apply_full_refreshes_all_services(self, tmp_path: Path) -> None:
"""--full runs up on all services to pick up config changes."""
cfg = _make_config(tmp_path)
mock_results = [_make_result("svc1"), _make_result("svc2")]
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.lifecycle.get_orphaned_services", return_value={}),
patch("compose_farm.cli.lifecycle.get_services_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_services_not_in_state", return_value=[]),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
),
patch("compose_farm.cli.lifecycle.up_services") as mock_up,
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=True, config=None)
mock_up.assert_called_once()
call_args = mock_up.call_args
# Should refresh all services in config
assert set(call_args[0][1]) == {"svc1", "svc2"}
def test_apply_full_dry_run_shows_refresh(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
"""--full --dry-run shows services that would be refreshed."""
cfg = _make_config(tmp_path)
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.lifecycle.get_orphaned_services", return_value={}),
patch("compose_farm.cli.lifecycle.get_services_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_services_not_in_state", return_value=[]),
):
apply(dry_run=True, no_orphans=False, full=True, config=None)
captured = capsys.readouterr()
assert "Services to refresh" in captured.out
assert "svc1" in captured.out
assert "svc2" in captured.out
assert "dry-run" in captured.out
def test_apply_full_excludes_already_handled_services(self, tmp_path: Path) -> None:
"""--full doesn't double-process services that are migrating or starting."""
cfg = _make_config(tmp_path, {"svc1": "host1", "svc2": "host2", "svc3": "host1"})
mock_results = [_make_result("svc1"), _make_result("svc3")]
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.lifecycle.get_orphaned_services", return_value={}),
patch(
"compose_farm.cli.lifecycle.get_services_needing_migration",
return_value=["svc1"],
),
patch(
"compose_farm.cli.lifecycle.get_services_not_in_state",
return_value=["svc2"],
),
patch("compose_farm.cli.lifecycle.get_service_host", return_value="host2"),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
),
patch("compose_farm.cli.lifecycle.up_services") as mock_up,
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=True, config=None)
# up_services should be called 3 times: migrate, start, refresh
assert mock_up.call_count == 3
# Get the third call (refresh) and check it only has svc3
refresh_call = mock_up.call_args_list[2]
assert refresh_call[0][1] == ["svc3"]
class TestDownOrphaned:
"""Tests for down --orphaned flag."""
def test_down_orphaned_no_orphans(
self, tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
"""When no orphans exist, prints success message."""
cfg = _make_config(tmp_path)
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.lifecycle.get_orphaned_services", return_value={}),
):
down(
services=None,
all_services=False,
orphaned=True,
host=None,
config=None,
)
captured = capsys.readouterr()
assert "No orphaned services to stop" in captured.out
def test_down_orphaned_stops_services(self, tmp_path: Path) -> None:
"""--orphaned stops orphaned services."""
cfg = _make_config(tmp_path)
mock_results = [_make_result("old-svc@host1")]
with (
patch("compose_farm.cli.lifecycle.load_config_or_exit", return_value=cfg),
patch(
"compose_farm.cli.lifecycle.get_orphaned_services",
return_value={"old-svc": "host1"},
),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
),
patch("compose_farm.cli.lifecycle.stop_orphaned_services") as mock_stop,
patch("compose_farm.cli.lifecycle.report_results"),
):
down(
services=None,
all_services=False,
orphaned=True,
host=None,
config=None,
)
mock_stop.assert_called_once_with(cfg)
def test_down_orphaned_with_services_errors(self) -> None:
"""--orphaned cannot be combined with service arguments."""
with pytest.raises(typer.Exit) as exc_info:
down(
services=["svc1"],
all_services=False,
orphaned=True,
host=None,
config=None,
)
assert exc_info.value.exit_code == 1
def test_down_orphaned_with_all_errors(self) -> None:
"""--orphaned cannot be combined with --all."""
with pytest.raises(typer.Exit) as exc_info:
down(
services=None,
all_services=True,
orphaned=True,
host=None,
config=None,
)
assert exc_info.value.exit_code == 1
def test_down_orphaned_with_host_errors(self) -> None:
"""--orphaned cannot be combined with --host."""
with pytest.raises(typer.Exit) as exc_info:
down(
services=None,
all_services=False,
orphaned=True,
host="host1",
config=None,
)
assert exc_info.value.exit_code == 1

View File

@@ -7,7 +7,6 @@ import pytest
import yaml
from typer.testing import CliRunner
import compose_farm.cli.config as config_cmd_module
from compose_farm.cli import app
from compose_farm.cli.config import (
_generate_template,
@@ -70,16 +69,9 @@ class TestGetConfigFile:
) -> None:
monkeypatch.chdir(tmp_path)
monkeypatch.delenv("CF_CONFIG", raising=False)
# Set XDG_CONFIG_HOME to a nonexistent path - config_search_paths() will
# now return paths that don't exist
monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "nonexistent"))
# Monkeypatch _CONFIG_PATHS to avoid finding existing files
monkeypatch.setattr(
config_cmd_module,
"_CONFIG_PATHS",
[
tmp_path / "compose-farm.yaml",
tmp_path / "nonexistent" / "compose-farm" / "compose-farm.yaml",
],
)
result = _get_config_file(None)
assert result is None

111
tests/test_operations.py Normal file
View File

@@ -0,0 +1,111 @@
"""Tests for operations module."""
from __future__ import annotations
import inspect
from pathlib import Path # noqa: TC003
from unittest.mock import patch
import pytest
from compose_farm.cli import lifecycle
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.operations import _migrate_service
@pytest.fixture
def basic_config(tmp_path: Path) -> Config:
"""Create a basic test config."""
compose_dir = tmp_path / "compose"
service_dir = compose_dir / "test-service"
service_dir.mkdir(parents=True)
(service_dir / "docker-compose.yml").write_text("services: {}")
return Config(
compose_dir=compose_dir,
hosts={
"host1": Host(address="localhost"),
"host2": Host(address="localhost"),
},
services={"test-service": "host2"},
)
class TestMigrationCommands:
"""Tests for migration command sequence."""
@pytest.fixture
def config(self, tmp_path: Path) -> Config:
"""Create a test config."""
compose_dir = tmp_path / "compose"
service_dir = compose_dir / "test-service"
service_dir.mkdir(parents=True)
(service_dir / "docker-compose.yml").write_text("services: {}")
return Config(
compose_dir=compose_dir,
hosts={
"host1": Host(address="localhost"),
"host2": Host(address="localhost"),
},
services={"test-service": "host2"},
)
async def test_migration_uses_pull_ignore_buildable(self, config: Config) -> None:
"""Migration should use 'pull --ignore-buildable' to skip buildable images."""
commands_called: list[str] = []
async def mock_run_compose_step(
cfg: Config, # noqa: ARG001
service: str,
command: str,
*,
raw: bool, # noqa: ARG001
host: str | None = None, # noqa: ARG001
) -> CommandResult:
commands_called.append(command)
return CommandResult(
service=service,
exit_code=0,
success=True,
)
with patch(
"compose_farm.operations._run_compose_step",
side_effect=mock_run_compose_step,
):
await _migrate_service(
config,
"test-service",
current_host="host1",
target_host="host2",
prefix="[test]",
raw=False,
)
# Migration should call pull with --ignore-buildable, then build, then down
assert "pull --ignore-buildable" in commands_called
assert "build" in commands_called
assert "down" in commands_called
# pull should come before build
pull_idx = commands_called.index("pull --ignore-buildable")
build_idx = commands_called.index("build")
assert pull_idx < build_idx
class TestUpdateCommandSequence:
"""Tests for update command sequence."""
def test_update_command_sequence_includes_build(self) -> None:
"""Update command should use pull --ignore-buildable and build."""
# This is a static check of the command sequence in lifecycle.py
# The actual command sequence is defined in the update function
source = inspect.getsource(lifecycle.update)
# Verify the command sequence includes pull --ignore-buildable
assert "pull --ignore-buildable" in source
# Verify build is included
assert '"build"' in source or "'build'" in source
# Verify the sequence is pull, build, down, up
assert "down" in source
assert "up -d" in source

View File

@@ -6,7 +6,9 @@ import pytest
from compose_farm.config import Config, Host
from compose_farm.state import (
get_orphaned_services,
get_service_host,
get_services_not_in_state,
load_state,
remove_service,
save_state,
@@ -130,3 +132,110 @@ class TestRemoveService:
result = load_state(config)
assert result["plex"] == "nas01"
class TestGetOrphanedServices:
"""Tests for get_orphaned_services function."""
def test_no_orphans(self, config: Config) -> None:
"""Returns empty dict when all services in state are in config."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n")
result = get_orphaned_services(config)
assert result == {}
def test_finds_orphaned_service(self, config: Config) -> None:
"""Returns services in state but not in config."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n jellyfin: nas02\n")
result = get_orphaned_services(config)
# plex is in config, jellyfin is not
assert result == {"jellyfin": "nas02"}
def test_finds_orphaned_multi_host_service(self, config: Config) -> None:
"""Returns multi-host orphaned services with host list."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n dozzle:\n - nas01\n - nas02\n")
result = get_orphaned_services(config)
assert result == {"dozzle": ["nas01", "nas02"]}
def test_empty_state(self, config: Config) -> None:
"""Returns empty dict when state is empty."""
result = get_orphaned_services(config)
assert result == {}
def test_all_orphaned(self, tmp_path: Path) -> None:
"""Returns all services when none are in config."""
config_path = tmp_path / "compose-farm.yaml"
config_path.write_text("")
cfg = Config(
compose_dir=tmp_path / "compose",
hosts={"nas01": Host(address="192.168.1.10")},
services={}, # No services in config
config_path=config_path,
)
state_file = cfg.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n jellyfin: nas02\n")
result = get_orphaned_services(cfg)
assert result == {"plex": "nas01", "jellyfin": "nas02"}
class TestGetServicesNotInState:
"""Tests for get_services_not_in_state function."""
def test_all_in_state(self, config: Config) -> None:
"""Returns empty list when all services are in state."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n")
result = get_services_not_in_state(config)
assert result == []
def test_finds_missing_service(self, tmp_path: Path) -> None:
"""Returns services in config but not in state."""
config_path = tmp_path / "compose-farm.yaml"
config_path.write_text("")
cfg = Config(
compose_dir=tmp_path / "compose",
hosts={"nas01": Host(address="192.168.1.10")},
services={"plex": "nas01", "jellyfin": "nas01"},
config_path=config_path,
)
state_file = cfg.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n")
result = get_services_not_in_state(cfg)
assert result == ["jellyfin"]
def test_empty_state(self, tmp_path: Path) -> None:
"""Returns all services when state is empty."""
config_path = tmp_path / "compose-farm.yaml"
config_path.write_text("")
cfg = Config(
compose_dir=tmp_path / "compose",
hosts={"nas01": Host(address="192.168.1.10")},
services={"plex": "nas01", "jellyfin": "nas01"},
config_path=config_path,
)
result = get_services_not_in_state(cfg)
assert set(result) == {"plex", "jellyfin"}
def test_empty_config(self, config: Config) -> None:
"""Returns empty list when config has no services."""
# config fixture has plex: nas01, but we need empty config
config_path = config.config_path
config_path.write_text("")
cfg = Config(
compose_dir=config.compose_dir,
hosts={"nas01": Host(address="192.168.1.10")},
services={},
config_path=config_path,
)
result = get_services_not_in_state(cfg)
assert result == []