Compare commits

...

16 Commits

Author SHA1 Message Date
Bas Nijholt
b7315d255a refactor: Split CLI into modular subpackage (#11) 2025-12-16 13:08:08 -08:00
renovate[bot]
f003d2931f ⬆️ Update actions/checkout action to v6 (#5)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-12-16 12:19:45 -08:00
renovate[bot]
6f7c557065 ⬆️ Update actions/setup-python action to v6 (#6)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-12-16 12:18:34 -08:00
renovate[bot]
ecb6ee46b1 ⬆️ Update astral-sh/setup-uv action to v7 (#8)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-12-16 12:18:28 -08:00
renovate[bot]
354967010f ⬆️ Update redis Docker tag to v8 (#9)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2025-12-16 12:18:22 -08:00
github-actions[bot]
57122f31a3 Update README.md 2025-12-16 20:01:03 +00:00
Bas Nijholt
cbbcec0d14 Add config subcommand for managing configuration files (#10) 2025-12-16 12:00:44 -08:00
Bas Nijholt
de38c35b8a docs: Add one-liner showing manual equivalent 2025-12-16 11:19:56 -08:00
github-actions[bot]
def996ddf4 Update README.md 2025-12-16 19:14:07 +00:00
Bas Nijholt
790e32e96b Fix test_load_config_not_found for CF_CONFIG env var 2025-12-16 11:13:44 -08:00
Bas Nijholt
fd75c4d87f Add CLI --help output to README 2025-12-16 11:12:43 -08:00
Bas Nijholt
411a99cbc4 Wait for PyPI propagation before Docker build
Also add Python 3.14 to classifiers.
2025-12-16 11:04:35 -08:00
Bas Nijholt
d2c6ab72b2 Add CF_CONFIG env var for simpler Docker workflow
Config search order is now:
1. --config CLI option
2. CF_CONFIG environment variable
3. ./compose-farm.yaml
4. ~/.config/compose-farm/compose-farm.yaml

Docker workflow simplified: mount compose_dir once, set CF_CONFIG
to config file within it. No more symlink issues or multiple mounts.
2025-12-16 10:12:55 -08:00
Bas Nijholt
3656584eda Friendly error when config path is a directory
Docker creates empty directories for missing file mounts,
causing confusing IsADirectoryError tracebacks. Now shows
a clear message explaining the likely cause.
2025-12-16 09:49:40 -08:00
Bas Nijholt
8be370098d Use env vars for docker-compose.yml mounts
- CF_CONFIG_DIR: config directory (default: ~/.config/compose-farm)
- CF_COMPOSE_DIR: compose directory (default: /opt/compose)

Mounts preserve paths so compose_dir in config works correctly.
2025-12-16 09:49:34 -08:00
Bas Nijholt
45057cb6df feat: Add docker-compose.yml for easier Docker usage
Example compose file that mounts SSH agent and config.
Users uncomment the compose_dir mount for their setup.
2025-12-16 09:40:18 -08:00
22 changed files with 1585 additions and 762 deletions

View File

@@ -16,10 +16,10 @@ jobs:
python-version: ["3.11", "3.12", "3.13"]
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@v7
- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}
@@ -39,10 +39,10 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@v7
- name: Set up Python
run: uv python install 3.12

View File

@@ -25,7 +25,7 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
@@ -52,6 +52,22 @@ jobs:
fi
echo "version=$VERSION" >> $GITHUB_OUTPUT
- name: Wait for PyPI
if: steps.version.outputs.version != ''
run: |
VERSION="${{ steps.version.outputs.version }}"
echo "Waiting for compose-farm==$VERSION on PyPI..."
for i in {1..30}; do
if curl -sf "https://pypi.org/pypi/compose-farm/$VERSION/json" > /dev/null; then
echo "✓ Version $VERSION available on PyPI"
exit 0
fi
echo "Attempt $i: not yet available, waiting 10s..."
sleep 10
done
echo "✗ Timeout waiting for PyPI"
exit 1
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5

View File

@@ -13,9 +13,9 @@ jobs:
permissions:
id-token: write
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@v7
- name: Build
run: uv build
- name: Publish package distributions to PyPI

View File

@@ -11,16 +11,16 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out repository
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
persist-credentials: false
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v5
uses: actions/setup-python@v6
- name: Install uv
uses: astral-sh/setup-uv@v6
uses: astral-sh/setup-uv@v7
- name: Run markdown-code-runner
env:

View File

@@ -10,15 +10,21 @@
```
compose_farm/
├── cli.py # Typer commands (thin layer, delegates to operations)
├── config.py # Pydantic models, YAML loading
├── compose.py # Compose file parsing (.env, ports, volumes, networks)
├── console.py # Shared Rich console instances
├── executor.py # SSH/local command execution, streaming output
├── operations.py # Business logic (up, migrate, discover, preflight checks)
├── state.py # Deployment state tracking (which service on which host)
├── logs.py # Image digest snapshots (dockerfarm-log.toml)
── traefik.py # Traefik file-provider config generation from labels
├── cli/ # CLI subpackage
│ ├── __init__.py # Main Typer app, version callback
├── common.py # Shared helpers, options, progress bar utilities
├── config.py # Config subcommand (init, show, path, validate, edit)
│ ├── lifecycle.py # up, down, pull, restart, update commands
│ ├── management.py # sync, check, init-network, traefik-file commands
│ └── monitoring.py # logs, ps, stats commands
├── config.py # Pydantic models, YAML loading
── compose.py # Compose file parsing (.env, ports, volumes, networks)
├── console.py # Shared Rich console instances
├── executor.py # SSH/local command execution, streaming output
├── operations.py # Business logic (up, migrate, discover, preflight checks)
├── state.py # Deployment state tracking (which service on which host)
├── logs.py # Image digest snapshots (dockerfarm-log.toml)
└── traefik.py # Traefik file-provider config generation from labels
```
## Key Design Decisions
@@ -60,3 +66,4 @@ CLI available as `cf` or `compose-farm`.
| `check` | Validate config, traefik labels, mounts, networks; show host compatibility |
| `init-network` | Create Docker network on hosts with consistent subnet/gateway |
| `traefik-file` | Generate Traefik file-provider config from compose labels |
| `config` | Manage config files (init, show, path, validate, edit) |

View File

@@ -25,6 +25,7 @@ A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
- [Installation](#installation)
- [Configuration](#configuration)
- [Multi-Host Services](#multi-host-services)
- [Config Command](#config-command)
- [Usage](#usage)
- [Auto-Migration](#auto-migration)
- [Traefik Multihost Ingress (File Provider)](#traefik-multihost-ingress-file-provider)
@@ -134,6 +135,12 @@ pip install compose-farm
<details><summary>🐳 Docker</summary>
Using the provided `docker-compose.yml`:
```bash
docker compose run --rm cf up --all
```
Or directly:
```bash
docker run --rm \
-v $SSH_AUTH_SOCK:/ssh-agent -e SSH_AUTH_SOCK=/ssh-agent \
@@ -141,11 +148,6 @@ docker run --rm \
ghcr.io/basnijholt/compose-farm up --all
```
Or create an alias:
```bash
alias cf='docker run --rm -v $SSH_AUTH_SOCK:/ssh-agent -e SSH_AUTH_SOCK=/ssh-agent -v ./compose-farm.yaml:/root/.config/compose-farm/compose-farm.yaml:ro ghcr.io/basnijholt/compose-farm'
```
</details>
## Configuration
@@ -206,6 +208,20 @@ When you run `cf up autokuma`, it starts the service on all hosts in parallel. M
- Show output with `[service@host]` prefix for each host
- Track all running hosts in state
### Config Command
Compose Farm includes a `config` subcommand to help manage configuration files:
```bash
cf config init # Create a new config file with documented example
cf config show # Display current config with syntax highlighting
cf config path # Print the config file path (useful for scripting)
cf config validate # Validate config syntax and schema
cf config edit # Open config in $EDITOR
```
Use `cf config init` to get started with a fully documented template.
## Usage
The CLI is available as both `compose-farm` and the shorter `cf` alias.
@@ -223,9 +239,12 @@ The CLI is available as both `compose-farm` and the shorter `cf` alias.
| `cf check` | Validate config, mounts, networks |
| `cf init-network` | Create Docker network on hosts |
| `cf traefik-file` | Generate Traefik file-provider config |
| `cf config <cmd>` | Manage config files (init, show, path, validate, edit) |
All commands support `--all` to operate on all services.
Each command replaces: look up host → SSH → find compose file → run `ssh host "cd /opt/compose/plex && docker compose up -d"`.
```bash
# Start services (auto-migrates if host changed in config)
cf up plex jellyfin
@@ -265,6 +284,60 @@ cf logs -f plex # follow
cf ps
```
<details>
<summary>See the output of <code>cf --help</code></summary>
<!-- CODE:BASH:START -->
<!-- echo '```yaml' -->
<!-- export NO_COLOR=1 -->
<!-- export TERM=dumb -->
<!-- export TERMINAL_WIDTH=90 -->
<!-- cf --help -->
<!-- echo '```' -->
<!-- CODE:END -->
<!-- OUTPUT:START -->
<!-- ⚠️ This content is auto-generated by `markdown-code-runner`. -->
```yaml
Usage: cf [OPTIONS] COMMAND [ARGS]...
Compose Farm - run docker compose commands across multiple hosts
╭─ Options ────────────────────────────────────────────────────────────────────╮
│ --version -v Show version and exit │
│ --install-completion Install completion for the current shell. │
│ --show-completion Show completion for the current shell, to │
│ copy it or customize the installation. │
│ --help -h Show this message and exit. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Lifecycle ──────────────────────────────────────────────────────────────────╮
│ up Start services (docker compose up -d). Auto-migrates if host │
│ changed. │
│ down Stop services (docker compose down). │
│ pull Pull latest images (docker compose pull). │
│ restart Restart services (down + up). │
│ update Update services (pull + down + up). │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Configuration ──────────────────────────────────────────────────────────────╮
│ traefik-file Generate a Traefik file-provider fragment from compose │
│ Traefik labels. │
│ sync Sync local state with running services. │
│ check Validate configuration, traefik labels, mounts, and networks. │
│ init-network Create Docker network on hosts with consistent settings. │
│ config Manage compose-farm configuration files. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Monitoring ─────────────────────────────────────────────────────────────────╮
│ logs Show service logs. │
│ ps Show status of all services. │
│ stats Show overview statistics for hosts and services. │
╰──────────────────────────────────────────────────────────────────────────────╯
```
<!-- OUTPUT:END -->
</details>
### Auto-Migration
When you change a service's host assignment in config and run `up`, Compose Farm automatically:

11
docker-compose.yml Normal file
View File

@@ -0,0 +1,11 @@
services:
cf:
image: ghcr.io/basnijholt/compose-farm:latest
volumes:
- ${SSH_AUTH_SOCK}:/ssh-agent:ro
# Compose directory (contains compose files AND compose-farm.yaml config)
- ${CF_COMPOSE_DIR:-/opt/compose}:${CF_COMPOSE_DIR:-/opt/compose}
environment:
- SSH_AUTH_SOCK=/ssh-agent
# Config file path (state stored alongside it)
- CF_CONFIG=${CF_COMPOSE_DIR:-/opt/compose}/compose-farm.yaml

View File

@@ -11,7 +11,7 @@
name: paperless-ngx
services:
redis:
image: redis:7
image: redis:8
container_name: paperless-redis
restart: unless-stopped
networks:

View File

@@ -34,6 +34,7 @@ classifiers = [
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Topic :: System :: Systems Administration",
"Topic :: Utilities",
"Typing :: Typed",

View File

@@ -0,0 +1,19 @@
"""CLI interface using Typer."""
from __future__ import annotations
# Import command modules to trigger registration via @app.command() decorators
from compose_farm.cli import (
config, # noqa: F401
lifecycle, # noqa: F401
management, # noqa: F401
monitoring, # noqa: F401
)
# Import the shared app instance
from compose_farm.cli.app import app
__all__ = ["app"]
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,42 @@
"""Shared Typer app instance."""
from __future__ import annotations
from typing import Annotated
import typer
from compose_farm import __version__
__all__ = ["app"]
def _version_callback(value: bool) -> None:
"""Print version and exit."""
if value:
typer.echo(f"compose-farm {__version__}")
raise typer.Exit
app = typer.Typer(
name="compose-farm",
help="Compose Farm - run docker compose commands across multiple hosts",
no_args_is_help=True,
context_settings={"help_option_names": ["-h", "--help"]},
)
@app.callback()
def main(
version: Annotated[
bool,
typer.Option(
"--version",
"-v",
help="Show version and exit",
callback=_version_callback,
is_eager=True,
),
] = False,
) -> None:
"""Compose Farm - run docker compose commands across multiple hosts."""

View File

@@ -0,0 +1,203 @@
"""Shared CLI helpers, options, and utilities."""
from __future__ import annotations
import asyncio
import contextlib
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, TypeVar
import typer
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskID,
TextColumn,
TimeElapsedColumn,
)
from compose_farm.config import Config, load_config
from compose_farm.console import console, err_console
from compose_farm.executor import CommandResult # noqa: TC001
from compose_farm.traefik import generate_traefik_config, render_traefik_config
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Generator
_T = TypeVar("_T")
# --- Shared CLI Options ---
ServicesArg = Annotated[
list[str] | None,
typer.Argument(help="Services to operate on"),
]
AllOption = Annotated[
bool,
typer.Option("--all", "-a", help="Run on all services"),
]
ConfigOption = Annotated[
Path | None,
typer.Option("--config", "-c", help="Path to config file"),
]
LogPathOption = Annotated[
Path | None,
typer.Option("--log-path", "-l", help="Path to Dockerfarm TOML log"),
]
HostOption = Annotated[
str | None,
typer.Option("--host", "-H", help="Filter to services on this host"),
]
# --- Constants (internal) ---
_MISSING_PATH_PREVIEW_LIMIT = 2
_STATS_PREVIEW_LIMIT = 3 # Max number of pending migrations to show by name
@contextlib.contextmanager
def progress_bar(label: str, total: int) -> Generator[tuple[Progress, TaskID], None, None]:
"""Create a standardized progress bar with consistent styling.
Yields (progress, task_id). Use progress.update(task_id, advance=1, description=...)
to advance.
"""
with Progress(
SpinnerColumn(),
TextColumn(f"[bold blue]{label}[/]"),
BarColumn(),
MofNCompleteColumn(),
TextColumn(""),
TimeElapsedColumn(),
TextColumn(""),
TextColumn("[progress.description]{task.description}"),
console=console,
transient=True,
) as progress:
task_id = progress.add_task("", total=total)
yield progress, task_id
def load_config_or_exit(config_path: Path | None) -> Config:
"""Load config or exit with a friendly error message."""
try:
return load_config(config_path)
except FileNotFoundError as e:
err_console.print(f"[red]✗[/] {e}")
raise typer.Exit(1) from e
def get_services(
services: list[str],
all_services: bool,
config_path: Path | None,
) -> tuple[list[str], Config]:
"""Resolve service list and load config."""
config = load_config_or_exit(config_path)
if all_services:
return list(config.services.keys()), config
if not services:
err_console.print("[red]✗[/] Specify services or use --all")
raise typer.Exit(1)
return list(services), config
def run_async(coro: Coroutine[None, None, _T]) -> _T:
"""Run async coroutine."""
return asyncio.run(coro)
def report_results(results: list[CommandResult]) -> None:
"""Report command results and exit with appropriate code."""
succeeded = [r for r in results if r.success]
failed = [r for r in results if not r.success]
# Always print summary when there are multiple results
if len(results) > 1:
console.print() # Blank line before summary
if failed:
for r in failed:
err_console.print(
f"[red]✗[/] [cyan]{r.service}[/] failed with exit code {r.exit_code}"
)
console.print()
console.print(
f"[green]✓[/] {len(succeeded)}/{len(results)} services succeeded, "
f"[red]✗[/] {len(failed)} failed"
)
else:
console.print(f"[green]✓[/] All {len(results)} services succeeded")
elif failed:
# Single service failed
r = failed[0]
err_console.print(f"[red]✗[/] [cyan]{r.service}[/] failed with exit code {r.exit_code}")
if failed:
raise typer.Exit(1)
def maybe_regenerate_traefik(cfg: Config) -> None:
"""Regenerate traefik config if traefik_file is configured."""
if cfg.traefik_file is None:
return
try:
dynamic, warnings = generate_traefik_config(cfg, list(cfg.services.keys()))
new_content = render_traefik_config(dynamic)
# Check if content changed
old_content = ""
if cfg.traefik_file.exists():
old_content = cfg.traefik_file.read_text()
if new_content != old_content:
cfg.traefik_file.parent.mkdir(parents=True, exist_ok=True)
cfg.traefik_file.write_text(new_content)
console.print() # Ensure we're on a new line after streaming output
console.print(f"[green]✓[/] Traefik config updated: {cfg.traefik_file}")
for warning in warnings:
err_console.print(f"[yellow]![/] {warning}")
except (FileNotFoundError, ValueError) as exc:
err_console.print(f"[yellow]![/] Failed to update traefik config: {exc}")
def validate_host_for_service(cfg: Config, service: str, host: str) -> None:
"""Validate that a host is valid for a service."""
if host not in cfg.hosts:
err_console.print(f"[red]✗[/] Host '{host}' not found in config")
raise typer.Exit(1)
allowed_hosts = cfg.get_hosts(service)
if host not in allowed_hosts:
err_console.print(
f"[red]✗[/] Service '{service}' is not configured for host '{host}' "
f"(configured: {', '.join(allowed_hosts)})"
)
raise typer.Exit(1)
def run_host_operation(
cfg: Config,
svc_list: list[str],
host: str,
command: str,
action_verb: str,
state_callback: Callable[[Config, str, str], None],
) -> None:
"""Run an operation on a specific host for multiple services."""
from compose_farm.executor import run_compose_on_host # noqa: PLC0415
results: list[CommandResult] = []
for service in svc_list:
validate_host_for_service(cfg, service, host)
console.print(f"[cyan]\\[{service}][/] {action_verb} on [magenta]{host}[/]...")
result = run_async(run_compose_on_host(cfg, service, host, command, raw=True))
print() # Newline after raw output
results.append(result)
if result.success:
state_callback(cfg, service, host)
maybe_regenerate_traefik(cfg)
report_results(results)

View File

@@ -0,0 +1,265 @@
"""Configuration management commands for compose-farm."""
from __future__ import annotations
import os
import platform
import shlex
import shutil
import subprocess
from importlib import resources
from pathlib import Path
from typing import Annotated
import typer
from compose_farm.cli.app import app
from compose_farm.config import load_config, xdg_config_home
from compose_farm.console import console, err_console
config_app = typer.Typer(
name="config",
help="Manage compose-farm configuration files.",
no_args_is_help=True,
)
# Default config location (internal)
_USER_CONFIG_PATH = xdg_config_home() / "compose-farm" / "compose-farm.yaml"
# Search paths for existing config (internal)
_CONFIG_PATHS = [
Path("compose-farm.yaml"),
_USER_CONFIG_PATH,
]
# --- CLI Options (same pattern as cli.py) ---
_PathOption = Annotated[
Path | None,
typer.Option("--path", "-p", help="Path to config file. Uses auto-detection if not specified."),
]
_ForceOption = Annotated[
bool,
typer.Option("--force", "-f", help="Overwrite existing config without confirmation."),
]
_RawOption = Annotated[
bool,
typer.Option("--raw", "-r", help="Output raw file contents (for copy-paste)."),
]
def _get_editor() -> str:
"""Get the user's preferred editor.
Checks $EDITOR, then $VISUAL, then falls back to platform defaults.
"""
for env_var in ("EDITOR", "VISUAL"):
editor = os.environ.get(env_var)
if editor:
return editor
if platform.system() == "Windows":
return "notepad"
# Try common editors on Unix-like systems
for editor in ("nano", "vim", "vi"):
if shutil.which(editor):
return editor
return "vi"
def _generate_template() -> str:
"""Generate a config template with documented schema."""
try:
template_file = resources.files("compose_farm") / "example-config.yaml"
return template_file.read_text(encoding="utf-8")
except FileNotFoundError as e:
err_console.print("[red]Example config template is missing from the package.[/red]")
err_console.print("Reinstall compose-farm or report this issue.")
raise typer.Exit(1) from e
def _get_config_file(path: Path | None) -> Path | None:
"""Resolve config path, or auto-detect from standard locations."""
if path:
return path.expanduser().resolve()
# Check environment variable
if env_path := os.environ.get("CF_CONFIG"):
p = Path(env_path)
if p.exists():
return p.resolve()
# Check standard locations
for p in _CONFIG_PATHS:
if p.exists():
return p.resolve()
return None
@config_app.command("init")
def config_init(
path: _PathOption = None,
force: _ForceOption = False,
) -> None:
"""Create a new config file with documented example.
The generated config file serves as a template showing all available
options with explanatory comments.
"""
target_path = (path.expanduser().resolve() if path else None) or _USER_CONFIG_PATH
if target_path.exists() and not force:
console.print(
f"[bold yellow]Config file already exists at:[/bold yellow] [cyan]{target_path}[/cyan]",
)
if not typer.confirm("Overwrite existing config file?"):
console.print("[dim]Aborted.[/dim]")
raise typer.Exit(0)
# Create parent directories
target_path.parent.mkdir(parents=True, exist_ok=True)
# Generate and write template
template_content = _generate_template()
target_path.write_text(template_content, encoding="utf-8")
console.print(f"[green]✓[/] Config file created at: {target_path}")
console.print("\n[dim]Edit the file to customize your settings:[/dim]")
console.print(" [cyan]cf config edit[/cyan]")
@config_app.command("edit")
def config_edit(
path: _PathOption = None,
) -> None:
"""Open the config file in your default editor.
The editor is determined by: $EDITOR > $VISUAL > platform default.
"""
config_file = _get_config_file(path)
if config_file is None:
console.print("[yellow]No config file found.[/yellow]")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
console.print("\nSearched locations:")
for p in _CONFIG_PATHS:
console.print(f" - {p}")
raise typer.Exit(1)
if not config_file.exists():
console.print("[yellow]Config file not found.[/yellow]")
console.print(f"\nProvided path does not exist: [cyan]{config_file}[/cyan]")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
raise typer.Exit(1)
editor = _get_editor()
console.print(f"[dim]Opening {config_file} with {editor}...[/dim]")
try:
editor_cmd = shlex.split(editor, posix=os.name != "nt")
except ValueError as e:
err_console.print("[red]Invalid editor command. Check $EDITOR/$VISUAL.[/red]")
raise typer.Exit(1) from e
if not editor_cmd:
err_console.print("[red]Editor command is empty.[/red]")
raise typer.Exit(1)
try:
subprocess.run([*editor_cmd, str(config_file)], check=True)
except FileNotFoundError:
err_console.print(f"[red]Editor '{editor_cmd[0]}' not found.[/red]")
err_console.print("Set $EDITOR environment variable to your preferred editor.")
raise typer.Exit(1) from None
except subprocess.CalledProcessError as e:
err_console.print(f"[red]Editor exited with error code {e.returncode}[/red]")
raise typer.Exit(e.returncode) from None
@config_app.command("show")
def config_show(
path: _PathOption = None,
raw: _RawOption = False,
) -> None:
"""Display the config file location and contents."""
config_file = _get_config_file(path)
if config_file is None:
console.print("[yellow]No config file found.[/yellow]")
console.print("\nSearched locations:")
for p in _CONFIG_PATHS:
status = "[green]exists[/green]" if p.exists() else "[dim]not found[/dim]"
console.print(f" - {p} ({status})")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
raise typer.Exit(0)
if not config_file.exists():
console.print("[yellow]Config file not found.[/yellow]")
console.print(f"\nProvided path does not exist: [cyan]{config_file}[/cyan]")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
raise typer.Exit(1)
content = config_file.read_text(encoding="utf-8")
if raw:
print(content, end="")
return
from rich.syntax import Syntax # noqa: PLC0415
console.print(f"[bold green]Config file:[/bold green] [cyan]{config_file}[/cyan]")
console.print()
syntax = Syntax(content, "yaml", theme="monokai", line_numbers=True, word_wrap=True)
console.print(syntax)
console.print()
console.print("[dim]Tip: Use -r for copy-paste friendly output[/dim]")
@config_app.command("path")
def config_path(
path: _PathOption = None,
) -> None:
"""Print the config file path (useful for scripting)."""
config_file = _get_config_file(path)
if config_file is None:
console.print("[yellow]No config file found.[/yellow]")
console.print("\nSearched locations:")
for p in _CONFIG_PATHS:
status = "[green]exists[/green]" if p.exists() else "[dim]not found[/dim]"
console.print(f" - {p} ({status})")
raise typer.Exit(1)
# Just print the path for easy piping
print(config_file)
@config_app.command("validate")
def config_validate(
path: _PathOption = None,
) -> None:
"""Validate the config file syntax and schema."""
config_file = _get_config_file(path)
if config_file is None:
err_console.print("[red]✗[/] No config file found")
raise typer.Exit(1)
try:
cfg = load_config(config_file)
except FileNotFoundError as e:
err_console.print(f"[red]✗[/] {e}")
raise typer.Exit(1) from e
except Exception as e:
err_console.print(f"[red]✗[/] Invalid config: {e}")
raise typer.Exit(1) from e
console.print(f"[green]✓[/] Valid config: {config_file}")
console.print(f" Hosts: {len(cfg.hosts)}")
console.print(f" Services: {len(cfg.services)}")
# Register config subcommand on the shared app
app.add_typer(config_app, name="config", rich_help_panel="Configuration")

View File

@@ -0,0 +1,144 @@
"""Lifecycle commands: up, down, pull, restart, update."""
from __future__ import annotations
from typing import Annotated
import typer
from compose_farm.cli.app import app
from compose_farm.cli.common import (
AllOption,
ConfigOption,
HostOption,
ServicesArg,
get_services,
load_config_or_exit,
maybe_regenerate_traefik,
report_results,
run_async,
run_host_operation,
)
from compose_farm.console import console
from compose_farm.executor import run_on_services, run_sequential_on_services
from compose_farm.operations import up_services
from compose_farm.state import (
add_service_to_host,
get_services_needing_migration,
remove_service,
remove_service_from_host,
)
@app.command(rich_help_panel="Lifecycle")
def up(
services: ServicesArg = None,
all_services: AllOption = False,
migrate: Annotated[
bool, typer.Option("--migrate", "-m", help="Only services needing migration")
] = False,
host: HostOption = None,
config: ConfigOption = None,
) -> None:
"""Start services (docker compose up -d). Auto-migrates if host changed."""
from compose_farm.console import err_console # noqa: PLC0415
if migrate and host:
err_console.print("[red]✗[/] Cannot use --migrate and --host together")
raise typer.Exit(1)
if migrate:
cfg = load_config_or_exit(config)
svc_list = get_services_needing_migration(cfg)
if not svc_list:
console.print("[green]✓[/] No services need migration")
return
console.print(f"[cyan]Migrating {len(svc_list)} service(s):[/] {', '.join(svc_list)}")
else:
svc_list, cfg = get_services(services or [], all_services, config)
# Per-host operation: run on specific host only
if host:
run_host_operation(cfg, svc_list, host, "up -d", "Starting", add_service_to_host)
return
# Normal operation: use up_services with migration logic
results = run_async(up_services(cfg, svc_list, raw=True))
maybe_regenerate_traefik(cfg)
report_results(results)
@app.command(rich_help_panel="Lifecycle")
def down(
services: ServicesArg = None,
all_services: AllOption = False,
host: HostOption = None,
config: ConfigOption = None,
) -> None:
"""Stop services (docker compose down)."""
svc_list, cfg = get_services(services or [], all_services, config)
# Per-host operation: run on specific host only
if host:
run_host_operation(cfg, svc_list, host, "down", "Stopping", remove_service_from_host)
return
# Normal operation
raw = len(svc_list) == 1
results = run_async(run_on_services(cfg, svc_list, "down", raw=raw))
# Remove from state on success
# For multi-host services, result.service is "svc@host", extract base name
removed_services: set[str] = set()
for result in results:
if result.success:
base_service = result.service.split("@")[0]
if base_service not in removed_services:
remove_service(cfg, base_service)
removed_services.add(base_service)
maybe_regenerate_traefik(cfg)
report_results(results)
@app.command(rich_help_panel="Lifecycle")
def pull(
services: ServicesArg = None,
all_services: AllOption = False,
config: ConfigOption = None,
) -> None:
"""Pull latest images (docker compose pull)."""
svc_list, cfg = get_services(services or [], all_services, config)
raw = len(svc_list) == 1
results = run_async(run_on_services(cfg, svc_list, "pull", raw=raw))
report_results(results)
@app.command(rich_help_panel="Lifecycle")
def restart(
services: ServicesArg = None,
all_services: AllOption = False,
config: ConfigOption = None,
) -> None:
"""Restart services (down + up)."""
svc_list, cfg = get_services(services or [], all_services, config)
raw = len(svc_list) == 1
results = run_async(run_sequential_on_services(cfg, svc_list, ["down", "up -d"], raw=raw))
maybe_regenerate_traefik(cfg)
report_results(results)
@app.command(rich_help_panel="Lifecycle")
def update(
services: ServicesArg = None,
all_services: AllOption = False,
config: ConfigOption = None,
) -> None:
"""Update services (pull + down + up)."""
svc_list, cfg = get_services(services or [], all_services, config)
raw = len(svc_list) == 1
results = run_async(
run_sequential_on_services(cfg, svc_list, ["pull", "down", "up -d"], raw=raw)
)
maybe_regenerate_traefik(cfg)
report_results(results)

View File

@@ -1,41 +1,39 @@
"""CLI interface using Typer."""
"""Management commands: sync, check, init-network, traefik-file."""
from __future__ import annotations
import asyncio
import contextlib
from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, TypeVar
from pathlib import Path # noqa: TC003
from typing import Annotated
import typer
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskID,
TextColumn,
TimeElapsedColumn,
)
from rich.table import Table
from rich.progress import Progress, TaskID # noqa: TC002
from . import __version__
from .compose import parse_external_networks
from .config import Config, load_config
from .console import console, err_console
from .executor import (
from compose_farm.cli.app import app
from compose_farm.cli.common import (
_MISSING_PATH_PREVIEW_LIMIT,
AllOption,
ConfigOption,
LogPathOption,
ServicesArg,
get_services,
load_config_or_exit,
progress_bar,
run_async,
)
from compose_farm.compose import parse_external_networks
from compose_farm.config import Config # noqa: TC001
from compose_farm.console import console, err_console
from compose_farm.executor import (
CommandResult,
check_networks_exist,
check_paths_exist,
check_service_running,
is_local,
run_command,
run_compose_on_host,
run_on_services,
run_sequential_on_services,
)
from .logs import (
from compose_farm.logs import (
DEFAULT_LOG_PATH,
SnapshotEntry,
collect_service_entries,
@@ -44,575 +42,11 @@ from .logs import (
merge_entries,
write_toml,
)
from .operations import (
check_host_compatibility,
get_service_paths,
up_services,
)
from .state import (
add_service_to_host,
get_services_needing_migration,
load_state,
remove_service,
remove_service_from_host,
save_state,
)
from .traefik import generate_traefik_config, render_traefik_config
from compose_farm.operations import check_host_compatibility, get_service_paths
from compose_farm.state import load_state, save_state
from compose_farm.traefik import generate_traefik_config, render_traefik_config
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Generator, Mapping
_T = TypeVar("_T")
@contextlib.contextmanager
def _progress_bar(label: str, total: int) -> Generator[tuple[Progress, TaskID], None, None]:
"""Create a standardized progress bar with consistent styling.
Yields (progress, task_id). Use progress.update(task_id, advance=1, description=...)
to advance.
"""
with Progress(
SpinnerColumn(),
TextColumn(f"[bold blue]{label}[/]"),
BarColumn(),
MofNCompleteColumn(),
TextColumn(""),
TimeElapsedColumn(),
TextColumn(""),
TextColumn("[progress.description]{task.description}"),
console=console,
transient=True,
) as progress:
task_id = progress.add_task("", total=total)
yield progress, task_id
def _load_config_or_exit(config_path: Path | None) -> Config:
"""Load config or exit with a friendly error message."""
try:
return load_config(config_path)
except FileNotFoundError as e:
err_console.print(f"[red]✗[/] {e}")
raise typer.Exit(1) from e
def _maybe_regenerate_traefik(cfg: Config) -> None:
"""Regenerate traefik config if traefik_file is configured."""
if cfg.traefik_file is None:
return
try:
dynamic, warnings = generate_traefik_config(cfg, list(cfg.services.keys()))
new_content = render_traefik_config(dynamic)
# Check if content changed
old_content = ""
if cfg.traefik_file.exists():
old_content = cfg.traefik_file.read_text()
if new_content != old_content:
cfg.traefik_file.parent.mkdir(parents=True, exist_ok=True)
cfg.traefik_file.write_text(new_content)
console.print() # Ensure we're on a new line after streaming output
console.print(f"[green]✓[/] Traefik config updated: {cfg.traefik_file}")
for warning in warnings:
err_console.print(f"[yellow]![/] {warning}")
except (FileNotFoundError, ValueError) as exc:
err_console.print(f"[yellow]![/] Failed to update traefik config: {exc}")
def _version_callback(value: bool) -> None:
"""Print version and exit."""
if value:
typer.echo(f"compose-farm {__version__}")
raise typer.Exit
app = typer.Typer(
name="compose-farm",
help="Compose Farm - run docker compose commands across multiple hosts",
no_args_is_help=True,
context_settings={"help_option_names": ["-h", "--help"]},
)
@app.callback()
def main(
version: Annotated[
bool,
typer.Option(
"--version",
"-v",
help="Show version and exit",
callback=_version_callback,
is_eager=True,
),
] = False,
) -> None:
"""Compose Farm - run docker compose commands across multiple hosts."""
def _get_services(
services: list[str],
all_services: bool,
config_path: Path | None,
) -> tuple[list[str], Config]:
"""Resolve service list and load config."""
config = _load_config_or_exit(config_path)
if all_services:
return list(config.services.keys()), config
if not services:
err_console.print("[red]✗[/] Specify services or use --all")
raise typer.Exit(1)
return list(services), config
def _run_async(coro: Coroutine[None, None, _T]) -> _T:
"""Run async coroutine."""
return asyncio.run(coro)
def _report_results(results: list[CommandResult]) -> None:
"""Report command results and exit with appropriate code."""
succeeded = [r for r in results if r.success]
failed = [r for r in results if not r.success]
# Always print summary when there are multiple results
if len(results) > 1:
console.print() # Blank line before summary
if failed:
for r in failed:
err_console.print(
f"[red]✗[/] [cyan]{r.service}[/] failed with exit code {r.exit_code}"
)
console.print()
console.print(
f"[green]✓[/] {len(succeeded)}/{len(results)} services succeeded, "
f"[red]✗[/] {len(failed)} failed"
)
else:
console.print(f"[green]✓[/] All {len(results)} services succeeded")
elif failed:
# Single service failed
r = failed[0]
err_console.print(f"[red]✗[/] [cyan]{r.service}[/] failed with exit code {r.exit_code}")
if failed:
raise typer.Exit(1)
def _run_host_operation(
cfg: Config,
svc_list: list[str],
host: str,
command: str,
action_verb: str,
state_callback: Callable[[Config, str, str], None],
) -> None:
"""Run an operation on a specific host for multiple services."""
results: list[CommandResult] = []
for service in svc_list:
_validate_host_for_service(cfg, service, host)
console.print(f"[cyan]\\[{service}][/] {action_verb} on [magenta]{host}[/]...")
result = _run_async(run_compose_on_host(cfg, service, host, command, raw=True))
print() # Newline after raw output
results.append(result)
if result.success:
state_callback(cfg, service, host)
_maybe_regenerate_traefik(cfg)
_report_results(results)
_ServicesArg = Annotated[
list[str] | None,
typer.Argument(help="Services to operate on"),
]
_AllOption = Annotated[
bool,
typer.Option("--all", "-a", help="Run on all services"),
]
_ConfigOption = Annotated[
Path | None,
typer.Option("--config", "-c", help="Path to config file"),
]
_LogPathOption = Annotated[
Path | None,
typer.Option("--log-path", "-l", help="Path to Dockerfarm TOML log"),
]
_HostOption = Annotated[
str | None,
typer.Option("--host", "-H", help="Filter to services on this host"),
]
_MISSING_PATH_PREVIEW_LIMIT = 2
def _validate_host_for_service(cfg: Config, service: str, host: str) -> None:
"""Validate that a host is valid for a service."""
if host not in cfg.hosts:
err_console.print(f"[red]✗[/] Host '{host}' not found in config")
raise typer.Exit(1)
allowed_hosts = cfg.get_hosts(service)
if host not in allowed_hosts:
err_console.print(
f"[red]✗[/] Service '{service}' is not configured for host '{host}' "
f"(configured: {', '.join(allowed_hosts)})"
)
raise typer.Exit(1)
@app.command(rich_help_panel="Lifecycle")
def up(
services: _ServicesArg = None,
all_services: _AllOption = False,
migrate: Annotated[
bool, typer.Option("--migrate", "-m", help="Only services needing migration")
] = False,
host: _HostOption = None,
config: _ConfigOption = None,
) -> None:
"""Start services (docker compose up -d). Auto-migrates if host changed."""
if migrate and host:
err_console.print("[red]✗[/] Cannot use --migrate and --host together")
raise typer.Exit(1)
if migrate:
cfg = _load_config_or_exit(config)
svc_list = get_services_needing_migration(cfg)
if not svc_list:
console.print("[green]✓[/] No services need migration")
return
console.print(f"[cyan]Migrating {len(svc_list)} service(s):[/] {', '.join(svc_list)}")
else:
svc_list, cfg = _get_services(services or [], all_services, config)
# Per-host operation: run on specific host only
if host:
_run_host_operation(cfg, svc_list, host, "up -d", "Starting", add_service_to_host)
return
# Normal operation: use up_services with migration logic
results = _run_async(up_services(cfg, svc_list, raw=True))
_maybe_regenerate_traefik(cfg)
_report_results(results)
@app.command(rich_help_panel="Lifecycle")
def down(
services: _ServicesArg = None,
all_services: _AllOption = False,
host: _HostOption = None,
config: _ConfigOption = None,
) -> None:
"""Stop services (docker compose down)."""
svc_list, cfg = _get_services(services or [], all_services, config)
# Per-host operation: run on specific host only
if host:
_run_host_operation(cfg, svc_list, host, "down", "Stopping", remove_service_from_host)
return
# Normal operation
raw = len(svc_list) == 1
results = _run_async(run_on_services(cfg, svc_list, "down", raw=raw))
# Remove from state on success
# For multi-host services, result.service is "svc@host", extract base name
removed_services: set[str] = set()
for result in results:
if result.success:
base_service = result.service.split("@")[0]
if base_service not in removed_services:
remove_service(cfg, base_service)
removed_services.add(base_service)
_maybe_regenerate_traefik(cfg)
_report_results(results)
@app.command(rich_help_panel="Lifecycle")
def pull(
services: _ServicesArg = None,
all_services: _AllOption = False,
config: _ConfigOption = None,
) -> None:
"""Pull latest images (docker compose pull)."""
svc_list, cfg = _get_services(services or [], all_services, config)
raw = len(svc_list) == 1
results = _run_async(run_on_services(cfg, svc_list, "pull", raw=raw))
_report_results(results)
@app.command(rich_help_panel="Lifecycle")
def restart(
services: _ServicesArg = None,
all_services: _AllOption = False,
config: _ConfigOption = None,
) -> None:
"""Restart services (down + up)."""
svc_list, cfg = _get_services(services or [], all_services, config)
raw = len(svc_list) == 1
results = _run_async(run_sequential_on_services(cfg, svc_list, ["down", "up -d"], raw=raw))
_maybe_regenerate_traefik(cfg)
_report_results(results)
@app.command(rich_help_panel="Lifecycle")
def update(
services: _ServicesArg = None,
all_services: _AllOption = False,
config: _ConfigOption = None,
) -> None:
"""Update services (pull + down + up)."""
svc_list, cfg = _get_services(services or [], all_services, config)
raw = len(svc_list) == 1
results = _run_async(
run_sequential_on_services(cfg, svc_list, ["pull", "down", "up -d"], raw=raw)
)
_maybe_regenerate_traefik(cfg)
_report_results(results)
@app.command(rich_help_panel="Monitoring")
def logs(
services: _ServicesArg = None,
all_services: _AllOption = False,
host: _HostOption = None,
follow: Annotated[bool, typer.Option("--follow", "-f", help="Follow logs")] = False,
tail: Annotated[
int | None,
typer.Option("--tail", "-n", help="Number of lines (default: 20 for --all, 100 otherwise)"),
] = None,
config: _ConfigOption = None,
) -> None:
"""Show service logs."""
if all_services and host is not None:
err_console.print("[red]✗[/] Cannot use --all and --host together")
raise typer.Exit(1)
cfg = _load_config_or_exit(config)
# Determine service list based on options
if host is not None:
if host not in cfg.hosts:
err_console.print(f"[red]✗[/] Host '{host}' not found in config")
raise typer.Exit(1)
# Include services where host is in the list of configured hosts
svc_list = [s for s in cfg.services if host in cfg.get_hosts(s)]
if not svc_list:
err_console.print(f"[yellow]![/] No services configured for host '{host}'")
return
else:
svc_list, cfg = _get_services(services or [], all_services, config)
# Default to fewer lines when showing multiple services
many_services = all_services or host is not None or len(svc_list) > 1
effective_tail = tail if tail is not None else (20 if many_services else 100)
cmd = f"logs --tail {effective_tail}"
if follow:
cmd += " -f"
results = _run_async(run_on_services(cfg, svc_list, cmd))
_report_results(results)
@app.command(rich_help_panel="Monitoring")
def ps(
config: _ConfigOption = None,
) -> None:
"""Show status of all services."""
cfg = _load_config_or_exit(config)
results = _run_async(run_on_services(cfg, list(cfg.services.keys()), "ps"))
_report_results(results)
_STATS_PREVIEW_LIMIT = 3 # Max number of pending migrations to show by name
def _group_services_by_host(
services: dict[str, str | list[str]],
hosts: Mapping[str, object],
all_hosts: list[str] | None = None,
) -> dict[str, list[str]]:
"""Group services by their assigned host(s).
For multi-host services (list or "all"), the service appears in multiple host lists.
"""
by_host: dict[str, list[str]] = {h: [] for h in hosts}
for service, host_value in services.items():
if isinstance(host_value, list):
# Explicit list of hosts
for host_name in host_value:
if host_name in by_host:
by_host[host_name].append(service)
elif host_value == "all" and all_hosts:
# "all" keyword - add to all hosts
for host_name in all_hosts:
if host_name in by_host:
by_host[host_name].append(service)
elif host_value in by_host:
# Single host
by_host[host_value].append(service)
return by_host
def _get_container_counts(cfg: Config) -> dict[str, int]:
"""Get container counts from all hosts with a progress bar."""
async def get_count(host_name: str) -> tuple[str, int]:
host = cfg.hosts[host_name]
result = await run_command(host, "docker ps -q | wc -l", host_name, stream=False)
count = 0
if result.success:
with contextlib.suppress(ValueError):
count = int(result.stdout.strip())
return host_name, count
async def gather_with_progress(progress: Progress, task_id: TaskID) -> dict[str, int]:
hosts = list(cfg.hosts.keys())
tasks = [asyncio.create_task(get_count(h)) for h in hosts]
results: dict[str, int] = {}
for coro in asyncio.as_completed(tasks):
host_name, count = await coro
results[host_name] = count
progress.update(task_id, advance=1, description=f"[cyan]{host_name}[/]")
return results
with _progress_bar("Querying hosts", len(cfg.hosts)) as (progress, task_id):
return asyncio.run(gather_with_progress(progress, task_id))
def _build_host_table(
cfg: Config,
services_by_host: dict[str, list[str]],
running_by_host: dict[str, list[str]],
container_counts: dict[str, int],
*,
show_containers: bool,
) -> Table:
"""Build the hosts table."""
table = Table(title="Hosts", show_header=True, header_style="bold cyan")
table.add_column("Host", style="magenta")
table.add_column("Address")
table.add_column("Configured", justify="right")
table.add_column("Running", justify="right")
if show_containers:
table.add_column("Containers", justify="right")
for host_name in sorted(cfg.hosts.keys()):
host = cfg.hosts[host_name]
configured = len(services_by_host[host_name])
running = len(running_by_host[host_name])
row = [
host_name,
host.address,
str(configured),
str(running) if running > 0 else "[dim]0[/]",
]
if show_containers:
count = container_counts.get(host_name, 0)
row.append(str(count) if count > 0 else "[dim]0[/]")
table.add_row(*row)
return table
def _build_summary_table(
cfg: Config, state: dict[str, str | list[str]], pending: list[str]
) -> Table:
"""Build the summary table."""
on_disk = cfg.discover_compose_dirs()
table = Table(title="Summary", show_header=False)
table.add_column("Label", style="dim")
table.add_column("Value", style="bold")
table.add_row("Total hosts", str(len(cfg.hosts)))
table.add_row("Services (configured)", str(len(cfg.services)))
table.add_row("Services (tracked)", str(len(state)))
table.add_row("Compose files on disk", str(len(on_disk)))
if pending:
preview = ", ".join(pending[:_STATS_PREVIEW_LIMIT])
suffix = "..." if len(pending) > _STATS_PREVIEW_LIMIT else ""
table.add_row("Pending migrations", f"[yellow]{len(pending)}[/] ({preview}{suffix})")
else:
table.add_row("Pending migrations", "[green]0[/]")
return table
@app.command(rich_help_panel="Monitoring")
def stats(
live: Annotated[
bool,
typer.Option("--live", "-l", help="Query Docker for live container stats"),
] = False,
config: _ConfigOption = None,
) -> None:
"""Show overview statistics for hosts and services.
Without --live: Shows config/state info (hosts, services, pending migrations).
With --live: Also queries Docker on each host for container counts.
"""
cfg = _load_config_or_exit(config)
state = load_state(cfg)
pending = get_services_needing_migration(cfg)
all_hosts = list(cfg.hosts.keys())
services_by_host = _group_services_by_host(cfg.services, cfg.hosts, all_hosts)
running_by_host = _group_services_by_host(state, cfg.hosts, all_hosts)
container_counts: dict[str, int] = {}
if live:
container_counts = _get_container_counts(cfg)
host_table = _build_host_table(
cfg, services_by_host, running_by_host, container_counts, show_containers=live
)
console.print(host_table)
console.print()
console.print(_build_summary_table(cfg, state, pending))
@app.command("traefik-file", rich_help_panel="Configuration")
def traefik_file(
services: _ServicesArg = None,
all_services: _AllOption = False,
output: Annotated[
Path | None,
typer.Option(
"--output",
"-o",
help="Write Traefik file-provider YAML to this path (stdout if omitted)",
),
] = None,
config: _ConfigOption = None,
) -> None:
"""Generate a Traefik file-provider fragment from compose Traefik labels."""
svc_list, cfg = _get_services(services or [], all_services, config)
try:
dynamic, warnings = generate_traefik_config(cfg, svc_list)
except (FileNotFoundError, ValueError) as exc:
err_console.print(f"[red]✗[/] {exc}")
raise typer.Exit(1) from exc
rendered = render_traefik_config(dynamic)
if output:
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(rendered)
console.print(f"[green]✓[/] Traefik config written to {output}")
else:
console.print(rendered)
for warning in warnings:
err_console.print(f"[yellow]![/] {warning}")
# --- Sync helpers ---
def _discover_services(cfg: Config) -> dict[str, str | list[str]]:
@@ -661,7 +95,7 @@ def _discover_services(cfg: Config) -> dict[str, str | list[str]]:
progress.update(task_id, advance=1, description=f"[cyan]{service}[/]")
return discovered
with _progress_bar("Discovering", len(cfg.services)) as (progress, task_id):
with progress_bar("Discovering", len(cfg.services)) as (progress, task_id):
return asyncio.run(gather_with_progress(progress, task_id))
@@ -699,7 +133,7 @@ def _snapshot_services(
now_dt = datetime.now(UTC)
now_iso = isoformat(now_dt)
with _progress_bar("Capturing", len(services)) as (progress, task_id):
with progress_bar("Capturing", len(services)) as (progress, task_id):
snapshot_entries = asyncio.run(gather_with_progress(progress, task_id, now_dt, services))
if not snapshot_entries:
@@ -713,6 +147,46 @@ def _snapshot_services(
return effective_log_path
def _format_host(host: str | list[str]) -> str:
"""Format a host value for display."""
if isinstance(host, list):
return ", ".join(host)
return host
def _report_sync_changes(
added: list[str],
removed: list[str],
changed: list[tuple[str, str | list[str], str | list[str]]],
discovered: dict[str, str | list[str]],
current_state: dict[str, str | list[str]],
) -> None:
"""Report sync changes to the user."""
if added:
console.print(f"\nNew services found ({len(added)}):")
for service in sorted(added):
host_str = _format_host(discovered[service])
console.print(f" [green]+[/] [cyan]{service}[/] on [magenta]{host_str}[/]")
if changed:
console.print(f"\nServices on different hosts ({len(changed)}):")
for service, old_host, new_host in sorted(changed):
old_str = _format_host(old_host)
new_str = _format_host(new_host)
console.print(
f" [yellow]~[/] [cyan]{service}[/]: [magenta]{old_str}[/] → [magenta]{new_str}[/]"
)
if removed:
console.print(f"\nServices no longer running ({len(removed)}):")
for service in sorted(removed):
host_str = _format_host(current_state[service])
console.print(f" [red]-[/] [cyan]{service}[/] (was on [magenta]{host_str}[/])")
# --- Check helpers ---
def _check_ssh_connectivity(cfg: Config) -> list[str]:
"""Check SSH connectivity to all hosts. Returns list of unreachable hosts."""
# Filter out local hosts - no SSH needed
@@ -738,7 +212,7 @@ def _check_ssh_connectivity(cfg: Config) -> list[str]:
progress.update(task_id, advance=1, description=f"[cyan]{host_name}[/]")
return unreachable
with _progress_bar("Checking SSH connectivity", len(remote_hosts)) as (progress, task_id):
with progress_bar("Checking SSH connectivity", len(remote_hosts)) as (progress, task_id):
return asyncio.run(gather_with_progress(progress, task_id))
@@ -794,101 +268,10 @@ def _check_mounts_and_networks(
return all_mount_errors, all_network_errors
with _progress_bar("Checking mounts/networks", len(services)) as (progress, task_id):
with progress_bar("Checking mounts/networks", len(services)) as (progress, task_id):
return asyncio.run(gather_with_progress(progress, task_id))
def _format_host(host: str | list[str]) -> str:
"""Format a host value for display."""
if isinstance(host, list):
return ", ".join(host)
return host
def _report_sync_changes(
added: list[str],
removed: list[str],
changed: list[tuple[str, str | list[str], str | list[str]]],
discovered: dict[str, str | list[str]],
current_state: dict[str, str | list[str]],
) -> None:
"""Report sync changes to the user."""
if added:
console.print(f"\nNew services found ({len(added)}):")
for service in sorted(added):
host_str = _format_host(discovered[service])
console.print(f" [green]+[/] [cyan]{service}[/] on [magenta]{host_str}[/]")
if changed:
console.print(f"\nServices on different hosts ({len(changed)}):")
for service, old_host, new_host in sorted(changed):
old_str = _format_host(old_host)
new_str = _format_host(new_host)
console.print(
f" [yellow]~[/] [cyan]{service}[/]: [magenta]{old_str}[/] → [magenta]{new_str}[/]"
)
if removed:
console.print(f"\nServices no longer running ({len(removed)}):")
for service in sorted(removed):
host_str = _format_host(current_state[service])
console.print(f" [red]-[/] [cyan]{service}[/] (was on [magenta]{host_str}[/])")
@app.command(rich_help_panel="Configuration")
def sync(
config: _ConfigOption = None,
log_path: _LogPathOption = None,
dry_run: Annotated[
bool,
typer.Option("--dry-run", "-n", help="Show what would be synced without writing"),
] = False,
) -> None:
"""Sync local state with running services.
Discovers which services are running on which hosts, updates the state
file, and captures image digests. Combines service discovery with
image snapshot into a single command.
"""
cfg = _load_config_or_exit(config)
current_state = load_state(cfg)
discovered = _discover_services(cfg)
# Calculate changes
added = [s for s in discovered if s not in current_state]
removed = [s for s in current_state if s not in discovered]
changed = [
(s, current_state[s], discovered[s])
for s in discovered
if s in current_state and current_state[s] != discovered[s]
]
# Report state changes
state_changed = bool(added or removed or changed)
if state_changed:
_report_sync_changes(added, removed, changed, discovered, current_state)
else:
console.print("[green]✓[/] State is already in sync.")
if dry_run:
console.print("\n[dim](dry-run: no changes made)[/]")
return
# Update state file
if state_changed:
save_state(cfg, discovered)
console.print(f"\n[green]✓[/] State updated: {len(discovered)} services tracked.")
# Capture image digests for running services
if discovered:
try:
path = _snapshot_services(cfg, list(discovered.keys()), log_path)
console.print(f"[green]✓[/] Digests written to {path}")
except RuntimeError as exc:
err_console.print(f"[yellow]![/] {exc}")
def _report_config_status(cfg: Config) -> bool:
"""Check and report config vs disk status. Returns True if errors found."""
configured = set(cfg.services.keys())
@@ -1036,21 +419,116 @@ def _run_remote_checks(cfg: Config, svc_list: list[str], *, show_host_compat: bo
if show_host_compat:
for service in svc_list:
console.print(f"\n[bold]Host compatibility for[/] [cyan]{service}[/]:")
compat = _run_async(check_host_compatibility(cfg, service))
compat = run_async(check_host_compatibility(cfg, service))
assigned_hosts = cfg.get_hosts(service)
_report_host_compatibility(compat, assigned_hosts)
return has_errors
# Default network settings for cross-host Docker networking
_DEFAULT_NETWORK_NAME = "mynetwork"
_DEFAULT_NETWORK_SUBNET = "172.20.0.0/16"
_DEFAULT_NETWORK_GATEWAY = "172.20.0.1"
@app.command("traefik-file", rich_help_panel="Configuration")
def traefik_file(
services: ServicesArg = None,
all_services: AllOption = False,
output: Annotated[
Path | None,
typer.Option(
"--output",
"-o",
help="Write Traefik file-provider YAML to this path (stdout if omitted)",
),
] = None,
config: ConfigOption = None,
) -> None:
"""Generate a Traefik file-provider fragment from compose Traefik labels."""
svc_list, cfg = get_services(services or [], all_services, config)
try:
dynamic, warnings = generate_traefik_config(cfg, svc_list)
except (FileNotFoundError, ValueError) as exc:
err_console.print(f"[red]✗[/] {exc}")
raise typer.Exit(1) from exc
rendered = render_traefik_config(dynamic)
if output:
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(rendered)
console.print(f"[green]✓[/] Traefik config written to {output}")
else:
console.print(rendered)
for warning in warnings:
err_console.print(f"[yellow]![/] {warning}")
@app.command(rich_help_panel="Configuration")
def sync(
config: ConfigOption = None,
log_path: LogPathOption = None,
dry_run: Annotated[
bool,
typer.Option("--dry-run", "-n", help="Show what would be synced without writing"),
] = False,
) -> None:
"""Sync local state with running services.
Discovers which services are running on which hosts, updates the state
file, and captures image digests. Combines service discovery with
image snapshot into a single command.
"""
cfg = load_config_or_exit(config)
current_state = load_state(cfg)
discovered = _discover_services(cfg)
# Calculate changes
added = [s for s in discovered if s not in current_state]
removed = [s for s in current_state if s not in discovered]
changed = [
(s, current_state[s], discovered[s])
for s in discovered
if s in current_state and current_state[s] != discovered[s]
]
# Report state changes
state_changed = bool(added or removed or changed)
if state_changed:
_report_sync_changes(added, removed, changed, discovered, current_state)
else:
console.print("[green]✓[/] State is already in sync.")
if dry_run:
console.print("\n[dim](dry-run: no changes made)[/]")
return
# Update state file
if state_changed:
save_state(cfg, discovered)
console.print(f"\n[green]✓[/] State updated: {len(discovered)} services tracked.")
# Capture image digests for running services
if discovered:
try:
path = _snapshot_services(cfg, list(discovered.keys()), log_path)
console.print(f"[green]✓[/] Digests written to {path}")
except RuntimeError as exc:
err_console.print(f"[yellow]![/] {exc}")
@app.command(rich_help_panel="Configuration")
def check(
services: _ServicesArg = None,
services: ServicesArg = None,
local: Annotated[
bool,
typer.Option("--local", help="Skip SSH-based checks (faster)"),
] = False,
config: _ConfigOption = None,
config: ConfigOption = None,
) -> None:
"""Validate configuration, traefik labels, mounts, and networks.
@@ -1059,7 +537,7 @@ def check(
Use --local to skip SSH-based checks for faster validation.
"""
cfg = _load_config_or_exit(config)
cfg = load_config_or_exit(config)
# Determine which services to check and whether to show host compatibility
if services:
@@ -1089,12 +567,6 @@ def check(
raise typer.Exit(1)
# Default network settings for cross-host Docker networking
_DEFAULT_NETWORK_NAME = "mynetwork"
_DEFAULT_NETWORK_SUBNET = "172.20.0.0/16"
_DEFAULT_NETWORK_GATEWAY = "172.20.0.1"
@app.command("init-network", rich_help_panel="Configuration")
def init_network(
hosts: Annotated[
@@ -1113,7 +585,7 @@ def init_network(
str,
typer.Option("--gateway", "-g", help="Network gateway"),
] = _DEFAULT_NETWORK_GATEWAY,
config: _ConfigOption = None,
config: ConfigOption = None,
) -> None:
"""Create Docker network on hosts with consistent settings.
@@ -1121,7 +593,7 @@ def init_network(
communication. Uses the same subnet/gateway on all hosts to ensure
consistent networking.
"""
cfg = _load_config_or_exit(config)
cfg = load_config_or_exit(config)
target_hosts = list(hosts) if hosts else list(cfg.hosts.keys())
invalid = [h for h in target_hosts if h not in cfg.hosts]
@@ -1163,11 +635,7 @@ def init_network(
async def run_all() -> list[CommandResult]:
return await asyncio.gather(*[create_network_on_host(h) for h in target_hosts])
results = _run_async(run_all())
results = run_async(run_all())
failed = [r for r in results if not r.success]
if failed:
raise typer.Exit(1)
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,235 @@
"""Monitoring commands: logs, ps, stats."""
from __future__ import annotations
import asyncio
import contextlib
from typing import TYPE_CHECKING, Annotated
import typer
from rich.progress import Progress, TaskID # noqa: TC002
from rich.table import Table
from compose_farm.cli.app import app
from compose_farm.cli.common import (
_STATS_PREVIEW_LIMIT,
AllOption,
ConfigOption,
HostOption,
ServicesArg,
get_services,
load_config_or_exit,
progress_bar,
report_results,
run_async,
)
from compose_farm.config import Config # noqa: TC001
from compose_farm.console import console, err_console
from compose_farm.executor import run_command, run_on_services
from compose_farm.state import get_services_needing_migration, load_state
if TYPE_CHECKING:
from collections.abc import Mapping
def _group_services_by_host(
services: dict[str, str | list[str]],
hosts: Mapping[str, object],
all_hosts: list[str] | None = None,
) -> dict[str, list[str]]:
"""Group services by their assigned host(s).
For multi-host services (list or "all"), the service appears in multiple host lists.
"""
by_host: dict[str, list[str]] = {h: [] for h in hosts}
for service, host_value in services.items():
if isinstance(host_value, list):
# Explicit list of hosts
for host_name in host_value:
if host_name in by_host:
by_host[host_name].append(service)
elif host_value == "all" and all_hosts:
# "all" keyword - add to all hosts
for host_name in all_hosts:
if host_name in by_host:
by_host[host_name].append(service)
elif host_value in by_host:
# Single host
by_host[host_value].append(service)
return by_host
def _get_container_counts(cfg: Config) -> dict[str, int]:
"""Get container counts from all hosts with a progress bar."""
async def get_count(host_name: str) -> tuple[str, int]:
host = cfg.hosts[host_name]
result = await run_command(host, "docker ps -q | wc -l", host_name, stream=False)
count = 0
if result.success:
with contextlib.suppress(ValueError):
count = int(result.stdout.strip())
return host_name, count
async def gather_with_progress(progress: Progress, task_id: TaskID) -> dict[str, int]:
hosts = list(cfg.hosts.keys())
tasks = [asyncio.create_task(get_count(h)) for h in hosts]
results: dict[str, int] = {}
for coro in asyncio.as_completed(tasks):
host_name, count = await coro
results[host_name] = count
progress.update(task_id, advance=1, description=f"[cyan]{host_name}[/]")
return results
with progress_bar("Querying hosts", len(cfg.hosts)) as (progress, task_id):
return asyncio.run(gather_with_progress(progress, task_id))
def _build_host_table(
cfg: Config,
services_by_host: dict[str, list[str]],
running_by_host: dict[str, list[str]],
container_counts: dict[str, int],
*,
show_containers: bool,
) -> Table:
"""Build the hosts table."""
table = Table(title="Hosts", show_header=True, header_style="bold cyan")
table.add_column("Host", style="magenta")
table.add_column("Address")
table.add_column("Configured", justify="right")
table.add_column("Running", justify="right")
if show_containers:
table.add_column("Containers", justify="right")
for host_name in sorted(cfg.hosts.keys()):
host = cfg.hosts[host_name]
configured = len(services_by_host[host_name])
running = len(running_by_host[host_name])
row = [
host_name,
host.address,
str(configured),
str(running) if running > 0 else "[dim]0[/]",
]
if show_containers:
count = container_counts.get(host_name, 0)
row.append(str(count) if count > 0 else "[dim]0[/]")
table.add_row(*row)
return table
def _build_summary_table(
cfg: Config, state: dict[str, str | list[str]], pending: list[str]
) -> Table:
"""Build the summary table."""
on_disk = cfg.discover_compose_dirs()
table = Table(title="Summary", show_header=False)
table.add_column("Label", style="dim")
table.add_column("Value", style="bold")
table.add_row("Total hosts", str(len(cfg.hosts)))
table.add_row("Services (configured)", str(len(cfg.services)))
table.add_row("Services (tracked)", str(len(state)))
table.add_row("Compose files on disk", str(len(on_disk)))
if pending:
preview = ", ".join(pending[:_STATS_PREVIEW_LIMIT])
suffix = "..." if len(pending) > _STATS_PREVIEW_LIMIT else ""
table.add_row("Pending migrations", f"[yellow]{len(pending)}[/] ({preview}{suffix})")
else:
table.add_row("Pending migrations", "[green]0[/]")
return table
# --- Command functions ---
@app.command(rich_help_panel="Monitoring")
def logs(
services: ServicesArg = None,
all_services: AllOption = False,
host: HostOption = None,
follow: Annotated[bool, typer.Option("--follow", "-f", help="Follow logs")] = False,
tail: Annotated[
int | None,
typer.Option("--tail", "-n", help="Number of lines (default: 20 for --all, 100 otherwise)"),
] = None,
config: ConfigOption = None,
) -> None:
"""Show service logs."""
if all_services and host is not None:
err_console.print("[red]✗[/] Cannot use --all and --host together")
raise typer.Exit(1)
cfg = load_config_or_exit(config)
# Determine service list based on options
if host is not None:
if host not in cfg.hosts:
err_console.print(f"[red]✗[/] Host '{host}' not found in config")
raise typer.Exit(1)
# Include services where host is in the list of configured hosts
svc_list = [s for s in cfg.services if host in cfg.get_hosts(s)]
if not svc_list:
err_console.print(f"[yellow]![/] No services configured for host '{host}'")
return
else:
svc_list, cfg = get_services(services or [], all_services, config)
# Default to fewer lines when showing multiple services
many_services = all_services or host is not None or len(svc_list) > 1
effective_tail = tail if tail is not None else (20 if many_services else 100)
cmd = f"logs --tail {effective_tail}"
if follow:
cmd += " -f"
results = run_async(run_on_services(cfg, svc_list, cmd))
report_results(results)
@app.command(rich_help_panel="Monitoring")
def ps(
config: ConfigOption = None,
) -> None:
"""Show status of all services."""
cfg = load_config_or_exit(config)
results = run_async(run_on_services(cfg, list(cfg.services.keys()), "ps"))
report_results(results)
@app.command(rich_help_panel="Monitoring")
def stats(
live: Annotated[
bool,
typer.Option("--live", "-l", help="Query Docker for live container stats"),
] = False,
config: ConfigOption = None,
) -> None:
"""Show overview statistics for hosts and services.
Without --live: Shows config/state info (hosts, services, pending migrations).
With --live: Also queries Docker on each host for container counts.
"""
cfg = load_config_or_exit(config)
state = load_state(cfg)
pending = get_services_needing_migration(cfg)
all_hosts = list(cfg.hosts.keys())
services_by_host = _group_services_by_host(cfg.services, cfg.hosts, all_hosts)
running_by_host = _group_services_by_host(state, cfg.hosts, all_hosts)
container_counts: dict[str, int] = {}
if live:
container_counts = _get_container_counts(cfg)
host_table = _build_host_table(
cfg, services_by_host, running_by_host, container_counts, show_containers=live
)
console.print(host_table)
console.print()
console.print(_build_summary_table(cfg, state, pending))

View File

@@ -148,9 +148,10 @@ def load_config(path: Path | None = None) -> Config:
"""Load configuration from YAML file.
Search order:
1. Explicit path if provided
2. ./compose-farm.yaml
3. $XDG_CONFIG_HOME/compose-farm/compose-farm.yaml (defaults to ~/.config)
1. Explicit path if provided via --config
2. CF_CONFIG environment variable
3. ./compose-farm.yaml
4. $XDG_CONFIG_HOME/compose-farm/compose-farm.yaml (defaults to ~/.config)
"""
search_paths = [
Path("compose-farm.yaml"),
@@ -159,6 +160,8 @@ def load_config(path: Path | None = None) -> Config:
if path:
config_path = path
elif env_path := os.environ.get("CF_CONFIG"):
config_path = Path(env_path)
else:
config_path = None
for p in search_paths:
@@ -170,6 +173,13 @@ def load_config(path: Path | None = None) -> Config:
msg = f"Config file not found. Searched: {', '.join(str(p) for p in search_paths)}"
raise FileNotFoundError(msg)
if config_path.is_dir():
msg = (
f"Config path is a directory, not a file: {config_path}\n"
"This often happens when Docker creates an empty directory for a missing mount."
)
raise FileNotFoundError(msg)
with config_path.open() as f:
raw = yaml.safe_load(f)

View File

@@ -0,0 +1,89 @@
# Compose Farm configuration
# Documentation: https://github.com/basnijholt/compose-farm
#
# This file configures compose-farm to manage Docker Compose services
# across multiple hosts via SSH.
#
# Place this file at:
# - ./compose-farm.yaml (current directory)
# - ~/.config/compose-farm/compose-farm.yaml
# - Or specify with: cf --config /path/to/config.yaml
# - Or set CF_CONFIG environment variable
# ------------------------------------------------------------------------------
# compose_dir: Directory containing service subdirectories with compose files
# ------------------------------------------------------------------------------
# Each subdirectory should contain a compose.yaml (or docker-compose.yml).
# This path must be the same on all hosts (NFS mount recommended).
#
compose_dir: /opt/compose
# ------------------------------------------------------------------------------
# hosts: SSH connection details for each host
# ------------------------------------------------------------------------------
# Simple form:
# hostname: ip-or-fqdn
#
# Full form:
# hostname:
# address: ip-or-fqdn
# user: ssh-username # default: current user
# port: 22 # default: 22
#
# Note: "all" is a reserved keyword and cannot be used as a host name.
#
hosts:
# Example: simple form (uses current user, port 22)
server1: 192.168.1.10
# Example: full form with explicit user
server2:
address: 192.168.1.20
user: admin
# Example: full form with custom port
server3:
address: 192.168.1.30
user: root
port: 2222
# ------------------------------------------------------------------------------
# services: Map service names to their target host(s)
# ------------------------------------------------------------------------------
# Each service name must match a subdirectory in compose_dir.
#
# Single host:
# service-name: hostname
#
# Multiple hosts (explicit list):
# service-name: [host1, host2]
#
# All hosts:
# service-name: all
#
services:
# Example: service runs on a single host
nginx: server1
postgres: server2
# Example: service runs on multiple specific hosts
# prometheus: [server1, server2]
# Example: service runs on ALL hosts (e.g., monitoring agents)
# node-exporter: all
# ------------------------------------------------------------------------------
# traefik_file: (optional) Auto-generate Traefik file-provider config
# ------------------------------------------------------------------------------
# When set, compose-farm automatically regenerates this file after
# up/down/restart/update commands. Traefik watches this file for changes.
#
# traefik_file: /opt/compose/traefik/dynamic.d/compose-farm.yml
# ------------------------------------------------------------------------------
# traefik_service: (optional) Service name running Traefik
# ------------------------------------------------------------------------------
# When generating traefik_file, services on the same host as Traefik are
# skipped (they're handled by Traefik's Docker provider directly).
#
# traefik_service: traefik

View File

@@ -8,7 +8,7 @@ from unittest.mock import patch
import pytest
import typer
from compose_farm.cli import logs
from compose_farm.cli.monitoring import logs
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
@@ -37,7 +37,7 @@ def _make_result(service: str) -> CommandResult:
def _mock_run_async_factory(
services: list[str],
) -> tuple[Any, list[CommandResult]]:
"""Create a mock _run_async that returns results for given services."""
"""Create a mock run_async that returns results for given services."""
results = [_make_result(s) for s in services]
def mock_run_async(_coro: Coroutine[Any, Any, Any]) -> list[CommandResult]:
@@ -55,9 +55,10 @@ class TestLogsContextualDefault:
mock_run_async, _ = _mock_run_async_factory(["svc1", "svc2", "svc3"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
patch("compose_farm.cli.monitoring.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.common.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.monitoring.run_async", side_effect=mock_run_async),
patch("compose_farm.cli.monitoring.run_on_services") as mock_run,
):
mock_run.return_value = None
@@ -73,9 +74,10 @@ class TestLogsContextualDefault:
mock_run_async, _ = _mock_run_async_factory(["svc1"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
patch("compose_farm.cli.monitoring.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.common.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.monitoring.run_async", side_effect=mock_run_async),
patch("compose_farm.cli.monitoring.run_on_services") as mock_run,
):
logs(
services=["svc1"],
@@ -96,9 +98,10 @@ class TestLogsContextualDefault:
mock_run_async, _ = _mock_run_async_factory(["svc1", "svc2", "svc3"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
patch("compose_farm.cli.monitoring.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.common.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.monitoring.run_async", side_effect=mock_run_async),
patch("compose_farm.cli.monitoring.run_on_services") as mock_run,
):
logs(
services=None,
@@ -119,9 +122,10 @@ class TestLogsContextualDefault:
mock_run_async, _ = _mock_run_async_factory(["svc1"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
patch("compose_farm.cli.monitoring.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.common.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.monitoring.run_async", side_effect=mock_run_async),
patch("compose_farm.cli.monitoring.run_on_services") as mock_run,
):
logs(
services=["svc1"],
@@ -146,9 +150,9 @@ class TestLogsHostFilter:
mock_run_async, _ = _mock_run_async_factory(["svc1", "svc2"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
patch("compose_farm.cli.monitoring.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.monitoring.run_async", side_effect=mock_run_async),
patch("compose_farm.cli.monitoring.run_on_services") as mock_run,
):
logs(
services=None,
@@ -170,9 +174,9 @@ class TestLogsHostFilter:
mock_run_async, _ = _mock_run_async_factory(["svc1", "svc2"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
patch("compose_farm.cli.monitoring.load_config_or_exit", return_value=cfg),
patch("compose_farm.cli.monitoring.run_async", side_effect=mock_run_async),
patch("compose_farm.cli.monitoring.run_on_services") as mock_run,
):
logs(
services=None,
@@ -187,14 +191,10 @@ class TestLogsHostFilter:
call_args = mock_run.call_args
assert call_args[0][2] == "logs --tail 20"
def test_logs_all_and_host_mutually_exclusive(self, tmp_path: Path) -> None:
def test_logs_all_and_host_mutually_exclusive(self) -> None:
"""Using --all and --host together should error."""
cfg = _make_config(tmp_path)
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
pytest.raises(typer.Exit) as exc_info,
):
# No config mock needed - error is raised before config is loaded
with pytest.raises(typer.Exit) as exc_info:
logs(
services=None,
all_services=True,

View File

@@ -128,6 +128,8 @@ class TestLoadConfig:
def test_load_config_not_found(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.chdir(tmp_path)
monkeypatch.delenv("CF_CONFIG", raising=False)
monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "empty_config"))
with pytest.raises(FileNotFoundError, match="Config file not found"):
load_config()

238
tests/test_config_cmd.py Normal file
View File

@@ -0,0 +1,238 @@
"""Tests for config command module."""
from pathlib import Path
from typing import Any
import pytest
import yaml
from typer.testing import CliRunner
import compose_farm.cli.config as config_cmd_module
from compose_farm.cli import app
from compose_farm.cli.config import (
_generate_template,
_get_config_file,
_get_editor,
)
@pytest.fixture
def runner() -> CliRunner:
return CliRunner()
@pytest.fixture
def valid_config_data() -> dict[str, Any]:
return {
"compose_dir": "/opt/compose",
"hosts": {"server1": "192.168.1.10"},
"services": {"nginx": "server1"},
}
class TestGetEditor:
"""Tests for _get_editor function."""
def test_uses_editor_env(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("EDITOR", "code")
monkeypatch.delenv("VISUAL", raising=False)
assert _get_editor() == "code"
def test_uses_visual_env(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("EDITOR", raising=False)
monkeypatch.setenv("VISUAL", "subl")
assert _get_editor() == "subl"
def test_editor_takes_precedence(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("EDITOR", "vim")
monkeypatch.setenv("VISUAL", "code")
assert _get_editor() == "vim"
class TestGetConfigFile:
"""Tests for _get_config_file function."""
def test_explicit_path(self, tmp_path: Path) -> None:
config_file = tmp_path / "my-config.yaml"
config_file.touch()
result = _get_config_file(config_file)
assert result == config_file.resolve()
def test_cf_config_env(self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
config_file = tmp_path / "env-config.yaml"
config_file.touch()
monkeypatch.setenv("CF_CONFIG", str(config_file))
result = _get_config_file(None)
assert result == config_file.resolve()
def test_returns_none_when_not_found(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.chdir(tmp_path)
monkeypatch.delenv("CF_CONFIG", raising=False)
monkeypatch.setenv("XDG_CONFIG_HOME", str(tmp_path / "nonexistent"))
# Monkeypatch _CONFIG_PATHS to avoid finding existing files
monkeypatch.setattr(
config_cmd_module,
"_CONFIG_PATHS",
[
tmp_path / "compose-farm.yaml",
tmp_path / "nonexistent" / "compose-farm" / "compose-farm.yaml",
],
)
result = _get_config_file(None)
assert result is None
class TestGenerateTemplate:
"""Tests for _generate_template function."""
def test_generates_valid_yaml(self) -> None:
template = _generate_template()
# Should be valid YAML
data = yaml.safe_load(template)
assert "compose_dir" in data
assert "hosts" in data
assert "services" in data
def test_has_documentation_comments(self) -> None:
template = _generate_template()
assert "# Compose Farm configuration" in template
assert "hosts:" in template
assert "services:" in template
class TestConfigInit:
"""Tests for cf config init command."""
def test_init_creates_file(
self, runner: CliRunner, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("CF_CONFIG", raising=False)
config_file = tmp_path / "new-config.yaml"
result = runner.invoke(app, ["config", "init", "-p", str(config_file)])
assert result.exit_code == 0
assert config_file.exists()
assert "Config file created" in result.stdout
def test_init_force_overwrites(
self, runner: CliRunner, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("CF_CONFIG", raising=False)
config_file = tmp_path / "existing.yaml"
config_file.write_text("old content")
result = runner.invoke(app, ["config", "init", "-p", str(config_file), "-f"])
assert result.exit_code == 0
content = config_file.read_text()
assert "old content" not in content
assert "compose_dir" in content
def test_init_prompts_on_existing(
self, runner: CliRunner, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.delenv("CF_CONFIG", raising=False)
config_file = tmp_path / "existing.yaml"
config_file.write_text("old content")
result = runner.invoke(app, ["config", "init", "-p", str(config_file)], input="n\n")
assert result.exit_code == 0
assert "Aborted" in result.stdout
assert config_file.read_text() == "old content"
class TestConfigPath:
"""Tests for cf config path command."""
def test_path_shows_config(
self,
runner: CliRunner,
tmp_path: Path,
valid_config_data: dict[str, Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.chdir(tmp_path)
monkeypatch.delenv("CF_CONFIG", raising=False)
config_file = tmp_path / "compose-farm.yaml"
config_file.write_text(yaml.dump(valid_config_data))
result = runner.invoke(app, ["config", "path"])
assert result.exit_code == 0
assert str(config_file) in result.stdout
def test_path_with_explicit_path(self, runner: CliRunner, tmp_path: Path) -> None:
# When explicitly provided, path is returned even if file doesn't exist
nonexistent = tmp_path / "nonexistent.yaml"
result = runner.invoke(app, ["config", "path", "-p", str(nonexistent)])
assert result.exit_code == 0
assert str(nonexistent) in result.stdout
class TestConfigShow:
"""Tests for cf config show command."""
def test_show_displays_content(
self,
runner: CliRunner,
tmp_path: Path,
valid_config_data: dict[str, Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.chdir(tmp_path)
monkeypatch.delenv("CF_CONFIG", raising=False)
config_file = tmp_path / "compose-farm.yaml"
config_file.write_text(yaml.dump(valid_config_data))
result = runner.invoke(app, ["config", "show"])
assert result.exit_code == 0
assert "Config file:" in result.stdout
def test_show_raw_output(
self,
runner: CliRunner,
tmp_path: Path,
valid_config_data: dict[str, Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.chdir(tmp_path)
monkeypatch.delenv("CF_CONFIG", raising=False)
config_file = tmp_path / "compose-farm.yaml"
content = yaml.dump(valid_config_data)
config_file.write_text(content)
result = runner.invoke(app, ["config", "show", "-r"])
assert result.exit_code == 0
assert content in result.stdout
class TestConfigValidate:
"""Tests for cf config validate command."""
def test_validate_valid_config(
self,
runner: CliRunner,
tmp_path: Path,
valid_config_data: dict[str, Any],
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.chdir(tmp_path)
monkeypatch.delenv("CF_CONFIG", raising=False)
config_file = tmp_path / "compose-farm.yaml"
config_file.write_text(yaml.dump(valid_config_data))
result = runner.invoke(app, ["config", "validate"])
assert result.exit_code == 0
assert "Valid config" in result.stdout
assert "Hosts: 1" in result.stdout
assert "Services: 1" in result.stdout
def test_validate_invalid_config(self, runner: CliRunner, tmp_path: Path) -> None:
config_file = tmp_path / "invalid.yaml"
config_file.write_text("invalid: [yaml: content")
result = runner.invoke(app, ["config", "validate", "-p", str(config_file)])
assert result.exit_code == 1
# Error goes to stderr (captured in output when using CliRunner)
output = result.stdout + (result.stderr or "")
assert "Invalid config" in output or "" in output
def test_validate_missing_config(self, runner: CliRunner, tmp_path: Path) -> None:
nonexistent = tmp_path / "nonexistent.yaml"
result = runner.invoke(app, ["config", "validate", "-p", str(nonexistent)])
assert result.exit_code == 1
# Error goes to stderr
output = result.stdout + (result.stderr or "")
assert "Config file not found" in output or "not found" in output.lower()

View File

@@ -5,9 +5,9 @@ from unittest.mock import AsyncMock, patch
import pytest
from compose_farm import cli as cli_module
from compose_farm import executor as executor_module
from compose_farm import state as state_module
from compose_farm.cli import management as cli_management_module
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult, check_service_running
@@ -98,7 +98,7 @@ class TestReportSyncChanges:
def test_reports_added(self, capsys: pytest.CaptureFixture[str]) -> None:
"""Reports newly discovered services."""
cli_module._report_sync_changes(
cli_management_module._report_sync_changes(
added=["plex", "jellyfin"],
removed=[],
changed=[],
@@ -112,7 +112,7 @@ class TestReportSyncChanges:
def test_reports_removed(self, capsys: pytest.CaptureFixture[str]) -> None:
"""Reports services that are no longer running."""
cli_module._report_sync_changes(
cli_management_module._report_sync_changes(
added=[],
removed=["sonarr"],
changed=[],
@@ -125,7 +125,7 @@ class TestReportSyncChanges:
def test_reports_changed(self, capsys: pytest.CaptureFixture[str]) -> None:
"""Reports services that moved to a different host."""
cli_module._report_sync_changes(
cli_management_module._report_sync_changes(
added=[],
removed=[],
changed=[("plex", "nas01", "nas02")],