Compare commits

..

3 Commits

Author SHA1 Message Date
Bas Nijholt
2a923e6e81 fix: Include field name in config validation error messages (#131)
Previously, Pydantic validation errors like "Extra inputs are not
permitted" didn't show which field caused the error. Now the error
message includes the field location (e.g., "unknown_key: Extra inputs
are not permitted").
2025-12-22 22:35:19 -08:00
Bas Nijholt
5f2e081298 perf: Batch snapshot collection to 1 SSH call per host (#130)
## Summary

Optimize `cf refresh` SSH calls from O(stacks) to O(hosts):
- Discovery: 1 SSH call per host (unchanged)
- Snapshots: 1 SSH call per host (was 1 per stack)

For 50 stacks across 4 hosts: 54 → 8 SSH calls.

## Changes

**Performance:**
- Use `docker ps` + `docker image inspect` instead of `docker compose images` per stack
- Batch snapshot collection by host in `collect_stacks_entries_on_host()`

**Architecture:**
- Add `build_discovery_results()` to `operations.py` (business logic)
- Keep progress bar wrapper in `cli/management.py` (presentation)
- Remove dead code: `discover_all_stacks_on_all_hosts()`, `collect_all_stacks_entries()`
2025-12-22 22:19:32 -08:00
Bas Nijholt
6fbc7430cb perf: Optimize stray detection to use 1 SSH call per host (#129)
* perf: Optimize stray detection to use 1 SSH call per host

Previously, stray detection checked each stack on each host individually,
resulting in (stacks * hosts) SSH calls. For 50 stacks across 4 hosts,
this meant ~200 parallel SSH connections, causing "Connection lost" errors.

Now queries each host once for all running compose projects using:
  docker ps --format '{{.Label "com.docker.compose.project"}}' | sort -u

This reduces SSH calls from ~200 to just 4 (one per host).

Changes:
- Add get_running_stacks_on_host() in executor.py
- Add discover_all_stacks_on_all_hosts() in operations.py
- Update _discover_stacks_full() to use the batch approach

* Remove unused function and add tests

- Remove discover_stack_on_all_hosts() which is no longer used
- Add tests for get_running_stacks_on_host()
- Add tests for discover_all_stacks_on_all_hosts()
  - Verifies it returns correct StackDiscoveryResult
  - Verifies stray detection works
  - Verifies it makes only 1 call per host (not per stack)
2025-12-22 12:09:59 -08:00
37 changed files with 629 additions and 1059 deletions

View File

@@ -21,7 +21,7 @@ repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.9
hooks:
- id: ruff
- id: ruff-check
args: [--fix]
- id: ruff-format

View File

@@ -342,18 +342,14 @@ When you run `cf up autokuma`, it starts the stack on all hosts in parallel. Mul
Compose Farm includes a `config` subcommand to help manage configuration files:
```bash
cf config init # Create a new config file with documented example
cf config init --discover # Auto-detect compose files and interactively create config
cf config show # Display current config with syntax highlighting
cf config path # Print the config file path (useful for scripting)
cf config validate # Validate config syntax and schema
cf config edit # Open config in $EDITOR
cf config example --list # List available example templates
cf config example whoami # Generate sample stack files
cf config example full # Generate complete Traefik + whoami setup
cf config init # Create a new config file with documented example
cf config show # Display current config with syntax highlighting
cf config path # Print the config file path (useful for scripting)
cf config validate # Validate config syntax and schema
cf config edit # Open config in $EDITOR
```
Use `cf config init` to get started with a template, or `cf config init --discover` if you already have compose files.
Use `cf config init` to get started with a fully documented template.
## Usage
@@ -999,7 +995,6 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
│ validate Validate the config file syntax and schema. │
│ symlink Create a symlink from the default config location to a config │
│ file. │
│ example Generate example stack files from built-in templates. │
╰──────────────────────────────────────────────────────────────────────────────╯
```

View File

@@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:611d6fef767a8e0755367bf0c008dad016f38fa8b3be2362825ef7ef6ec2ec1a
size 2444902

View File

@@ -1,3 +0,0 @@
version https://git-lfs.github.com/spec/v1
oid sha256:1e851879acc99234628abce0f8dadeeaf500effe4f78bebc63c4b17a0ae092f1
size 900800

View File

@@ -578,10 +578,6 @@ cf traefik-file plex jellyfin -o /opt/traefik/cf.yml
Manage configuration files.
<video autoplay loop muted playsinline>
<source src="/assets/config-example.webm" type="video/webm">
</video>
```bash
cf config COMMAND
```
@@ -596,19 +592,17 @@ cf config COMMAND
| `validate` | Validate syntax and schema |
| `edit` | Open in $EDITOR |
| `symlink` | Create symlink from default location |
| `example` | Generate example stack files |
**Options by subcommand:**
| Subcommand | Options |
|------------|---------|
| `init` | `--path/-p PATH`, `--force/-f`, `--discover/-d` |
| `init` | `--path/-p PATH`, `--force/-f` |
| `show` | `--path/-p PATH`, `--raw/-r` |
| `edit` | `--path/-p PATH` |
| `path` | `--path/-p PATH` |
| `validate` | `--path/-p PATH` |
| `symlink` | `--force/-f` |
| `example` | `--list/-l`, `--output/-o PATH`, `--force/-f` |
**Examples:**
@@ -616,9 +610,6 @@ cf config COMMAND
# Create config at default location
cf config init
# Auto-discover compose files and interactively create config
cf config init --discover
# Create config at custom path
cf config init --path /opt/compose-farm/config.yaml
@@ -642,18 +633,6 @@ cf config symlink
# Create symlink to specific file
cf config symlink /opt/compose-farm/config.yaml
# List available example templates
cf config example --list
# Generate a sample stack (whoami, nginx, postgres)
cf config example whoami
# Generate complete Traefik + whoami setup
cf config example full
# Generate example in specific directory
cf config example nginx --output /opt/compose
```
---

View File

@@ -27,7 +27,6 @@ python docs/demos/cli/record.py quickstart migration
| `update.tape` | `cf update` |
| `migration.tape` | Service migration |
| `apply.tape` | `cf apply` |
| `config-example.tape` | `cf config example` - generate example stacks |
## Output

View File

@@ -1,110 +0,0 @@
# Config Example Demo
# Shows cf config example command
Output docs/assets/config-example.gif
Output docs/assets/config-example.webm
Set Shell "bash"
Set FontSize 14
Set Width 900
Set Height 600
Set Theme "Catppuccin Mocha"
Set FontFamily "FiraCode Nerd Font"
Set TypingSpeed 50ms
Env BAT_PAGING "always"
Type "# Generate example stacks with cf config example"
Enter
Sleep 500ms
Type "# List available templates"
Enter
Sleep 500ms
Type "cf config example --list"
Enter
Wait+Screen /Usage:/
Sleep 2s
Type "# Create a directory for our stacks"
Enter
Sleep 500ms
Type "mkdir -p ~/compose && cd ~/compose"
Enter
Wait
Sleep 500ms
Type "# Generate the full Traefik + whoami setup"
Enter
Sleep 500ms
Type "cf config example full"
Enter
Wait
Sleep 2s
Type "# See what was created"
Enter
Sleep 500ms
Type "tree ."
Enter
Wait
Sleep 2s
Type "# View the generated config"
Enter
Sleep 500ms
Type "bat compose-farm.yaml"
Enter
Sleep 3s
Type "q"
Sleep 500ms
Type "# View the traefik compose file"
Enter
Sleep 500ms
Type "bat traefik/compose.yaml"
Enter
Sleep 3s
Type "q"
Sleep 500ms
Type "# Validate the config"
Enter
Sleep 500ms
Type "cf check --local"
Enter
Wait
Sleep 2s
Type "# Create the Docker network"
Enter
Sleep 500ms
Type "cf init-network"
Enter
Wait
Sleep 1s
Type "# Deploy traefik and whoami"
Enter
Sleep 500ms
Type "cf up traefik whoami"
Enter
Wait
Sleep 3s
Type "# Verify it's running"
Enter
Sleep 500ms
Type "cf ps"
Enter
Wait
Sleep 2s

View File

@@ -149,24 +149,6 @@ cd /opt/stacks
cf config init
```
**Already have compose files?** Use `--discover` to auto-detect them and interactively build your config:
```bash
cf config init --discover
```
This scans for directories containing compose files, lets you select which stacks to include, and generates a ready-to-use config.
**Starting fresh?** Generate example stacks to learn from:
```bash
# List available examples
cf config example --list
# Generate a complete Traefik + whoami setup
cf config example full
```
Alternatively, use `~/.config/compose-farm/compose-farm.yaml` for a global config. You can also symlink a working directory config to the global location:
```bash

View File

@@ -37,12 +37,6 @@ _RawOption = Annotated[
bool,
typer.Option("--raw", "-r", help="Output raw file contents (for copy-paste)."),
]
_DiscoverOption = Annotated[
bool,
typer.Option(
"--discover", "-d", help="Auto-detect compose files and interactively select stacks."
),
]
def _get_editor() -> str:
@@ -74,117 +68,6 @@ def _get_config_file(path: Path | None) -> Path | None:
return config_path.resolve() if config_path else None
def _generate_discovered_config(
compose_dir: Path,
hostname: str,
host_address: str,
selected_stacks: list[str],
) -> str:
"""Generate config YAML from discovered stacks."""
import yaml # noqa: PLC0415
config_data = {
"compose_dir": str(compose_dir),
"hosts": {hostname: host_address},
"stacks": dict.fromkeys(selected_stacks, hostname),
}
header = """\
# Compose Farm configuration
# Documentation: https://github.com/basnijholt/compose-farm
#
# Generated by: cf config init --discover
"""
return header + yaml.dump(config_data, default_flow_style=False, sort_keys=False)
def _interactive_stack_selection(stacks: list[str]) -> list[str]:
"""Interactively select stacks to include."""
from rich.prompt import Confirm, Prompt # noqa: PLC0415
console.print("\n[bold]Found stacks:[/bold]")
for stack in stacks:
console.print(f" [cyan]{stack}[/cyan]")
console.print()
# Fast path: include all
if Confirm.ask(f"Include all {len(stacks)} stacks?", default=True):
return stacks
# Let user specify which to exclude
console.print(
"\n[dim]Enter stack names to exclude (comma-separated), or press Enter to select individually:[/dim]"
)
exclude_input = Prompt.ask("Exclude", default="")
if exclude_input.strip():
exclude = {s.strip() for s in exclude_input.split(",")}
return [s for s in stacks if s not in exclude]
# Fall back to individual selection
console.print()
return [
stack for stack in stacks if Confirm.ask(f" Include [cyan]{stack}[/cyan]?", default=True)
]
def _run_discovery_flow() -> str | None:
"""Run the interactive discovery flow and return generated config content."""
import socket # noqa: PLC0415
from rich.prompt import Prompt # noqa: PLC0415
console.print("[bold]Compose Farm Config Discovery[/bold]")
console.print("[dim]This will scan for compose files and generate a config.[/dim]\n")
# Step 1: Get compose directory
default_dir = Path.cwd()
compose_dir_str = Prompt.ask(
"Compose directory",
default=str(default_dir),
)
compose_dir = Path(compose_dir_str).expanduser().resolve()
if not compose_dir.exists():
print_error(f"Directory does not exist: {compose_dir}")
return None
if not compose_dir.is_dir():
print_error(f"Path is not a directory: {compose_dir}")
return None
# Step 2: Discover stacks
from compose_farm.config import discover_compose_dirs # noqa: PLC0415
console.print(f"\n[dim]Scanning {compose_dir}...[/dim]")
stacks = discover_compose_dirs(compose_dir)
if not stacks:
print_error(f"No compose files found in {compose_dir}")
console.print("[dim]Each stack should be in a subdirectory with a compose.yaml file.[/dim]")
return None
console.print(f"[green]Found {len(stacks)} stack(s)[/green]")
# Step 3: Interactive selection
selected_stacks = _interactive_stack_selection(stacks)
if not selected_stacks:
console.print("\n[yellow]No stacks selected.[/yellow]")
return None
# Step 4: Get hostname and address
default_hostname = socket.gethostname()
hostname = Prompt.ask("\nHost name", default=default_hostname)
host_address = Prompt.ask("Host address", default="localhost")
# Step 5: Generate config
console.print(f"\n[dim]Generating config with {len(selected_stacks)} stack(s)...[/dim]")
return _generate_discovered_config(compose_dir, hostname, host_address, selected_stacks)
def _report_missing_config(explicit_path: Path | None = None) -> None:
"""Report that a config file was not found."""
console.print("[yellow]Config file not found.[/yellow]")
@@ -202,15 +85,11 @@ def _report_missing_config(explicit_path: Path | None = None) -> None:
def config_init(
path: _PathOption = None,
force: _ForceOption = False,
discover: _DiscoverOption = False,
) -> None:
"""Create a new config file with documented example.
The generated config file serves as a template showing all available
options with explanatory comments.
Use --discover to auto-detect compose files and interactively select
which stacks to include.
"""
target_path = (path.expanduser().resolve() if path else None) or default_config_path()
@@ -222,17 +101,11 @@ def config_init(
console.print("[dim]Aborted.[/dim]")
raise typer.Exit(0)
if discover:
template_content = _run_discovery_flow()
if template_content is None:
raise typer.Exit(0)
else:
template_content = _generate_template()
# Create parent directories
target_path.parent.mkdir(parents=True, exist_ok=True)
# Write config file
# Generate and write template
template_content = _generate_template()
target_path.write_text(template_content, encoding="utf-8")
print_success(f"Config file created at: {target_path}")
@@ -420,115 +293,5 @@ def config_symlink(
console.print(f" -> {target_path}")
_ListOption = Annotated[
bool,
typer.Option("--list", "-l", help="List available example templates."),
]
@config_app.command("example")
def config_example(
name: Annotated[
str | None,
typer.Argument(help="Example template name (e.g., whoami, full)"),
] = None,
output_dir: Annotated[
Path | None,
typer.Option("--output", "-o", help="Output directory. Defaults to current directory."),
] = None,
list_examples: _ListOption = False,
force: _ForceOption = False,
) -> None:
"""Generate example stack files from built-in templates.
Examples:
cf config example --list # List available examples
cf config example whoami # Generate whoami stack in ./whoami/
cf config example full # Generate complete Traefik + whoami setup
cf config example nginx -o /opt/compose # Generate in specific directory
"""
from compose_farm.examples import ( # noqa: PLC0415
EXAMPLES,
SINGLE_STACK_EXAMPLES,
list_example_files,
)
# List mode
if list_examples:
console.print("[bold]Available example templates:[/bold]\n")
console.print("[dim]Single stack examples:[/dim]")
for example_name, description in SINGLE_STACK_EXAMPLES.items():
console.print(f" [cyan]{example_name}[/cyan] - {description}")
console.print()
console.print("[dim]Complete setup:[/dim]")
console.print(f" [cyan]full[/cyan] - {EXAMPLES['full']}")
console.print()
console.print("[dim]Usage: cf config example <name>[/dim]")
return
# Interactive selection if no name provided
if name is None:
from rich.prompt import Prompt # noqa: PLC0415
console.print("[bold]Available example templates:[/bold]\n")
example_names = list(EXAMPLES.keys())
for i, (example_name, description) in enumerate(EXAMPLES.items(), 1):
console.print(f" [{i}] [cyan]{example_name}[/cyan] - {description}")
console.print()
choice = Prompt.ask(
"Select example",
choices=[str(i) for i in range(1, len(example_names) + 1)] + example_names,
default="1",
)
# Handle numeric or name input
name = example_names[int(choice) - 1] if choice.isdigit() else choice
# Validate example name
if name not in EXAMPLES:
print_error(f"Unknown example: {name}")
console.print(f"Available examples: {', '.join(EXAMPLES.keys())}")
raise typer.Exit(1)
# Determine output directory
base_dir = (output_dir or Path.cwd()).expanduser().resolve()
# For 'full' example, use current dir; for single stacks, create subdir
target_dir = base_dir if name == "full" else base_dir / name
# Check for existing files
files = list_example_files(name)
existing_files = [f for f, _ in files if (target_dir / f).exists()]
if existing_files and not force:
console.print(f"[yellow]Files already exist in:[/yellow] {target_dir}")
console.print(f"[dim] {len(existing_files)} file(s) would be overwritten[/dim]")
if not typer.confirm("Overwrite existing files?"):
console.print("[dim]Aborted.[/dim]")
raise typer.Exit(0)
# Create directories and copy files
for rel_path, content in files:
file_path = target_dir / rel_path
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
console.print(f" [green]Created[/green] {file_path}")
print_success(f"Example '{name}' created at: {target_dir}")
# Show appropriate next steps
if name == "full":
console.print("\n[dim]Next steps:[/dim]")
console.print(f" 1. Edit [cyan]{target_dir}/compose-farm.yaml[/cyan] with your host IP")
console.print(" 2. Edit [cyan].env[/cyan] files with your domain")
console.print(" 3. Create Docker network: [cyan]docker network create mynetwork[/cyan]")
console.print(" 4. Deploy: [cyan]cf up traefik whoami[/cyan]")
else:
console.print("\n[dim]Next steps:[/dim]")
console.print(f" 1. Edit [cyan]{target_dir}/.env[/cyan] with your settings")
console.print(f" 2. Add to compose-farm.yaml: [cyan]{name}: <hostname>[/cyan]")
console.print(f" 3. Deploy with: [cyan]cf up {name}[/cyan]")
# Register config subcommand on the shared app
app.add_typer(config_app, name="config", rich_help_panel="Configuration")

View File

@@ -37,24 +37,23 @@ from compose_farm.console import (
)
from compose_farm.executor import (
CommandResult,
get_running_stacks_on_host,
is_local,
run_command,
)
from compose_farm.logs import (
DEFAULT_LOG_PATH,
SnapshotEntry,
collect_stack_entries,
collect_stacks_entries_on_host,
isoformat,
load_existing_entries,
merge_entries,
write_toml,
)
from compose_farm.operations import (
StackDiscoveryResult,
build_discovery_results,
check_host_compatibility,
check_stack_requirements,
discover_stack_host,
discover_stack_on_all_hosts,
)
from compose_farm.state import get_orphaned_stacks, load_state, save_state
from compose_farm.traefik import generate_traefik_config, render_traefik_config
@@ -62,38 +61,39 @@ from compose_farm.traefik import generate_traefik_config, render_traefik_config
# --- Sync helpers ---
def _discover_stacks(cfg: Config, stacks: list[str] | None = None) -> dict[str, str | list[str]]:
"""Discover running stacks with a progress bar."""
stack_list = stacks if stacks is not None else list(cfg.stacks)
results = run_parallel_with_progress(
"Discovering",
stack_list,
lambda s: discover_stack_host(cfg, s),
)
return {svc: host for svc, host in results if host is not None}
def _snapshot_stacks(
cfg: Config,
stacks: list[str],
discovered: dict[str, str | list[str]],
log_path: Path | None,
) -> Path:
"""Capture image digests with a progress bar."""
"""Capture image digests using batched SSH calls (1 per host).
Args:
cfg: Configuration
discovered: Dict mapping stack -> host(s) where it's running
log_path: Optional path to write the log file
Returns:
Path to the written log file.
"""
effective_log_path = log_path or DEFAULT_LOG_PATH
now_dt = datetime.now(UTC)
now_iso = isoformat(now_dt)
async def collect_stack(stack: str) -> tuple[str, list[SnapshotEntry]]:
try:
return stack, await collect_stack_entries(cfg, stack, now=now_dt)
except RuntimeError:
return stack, []
# Group stacks by host for batched SSH calls
stacks_by_host: dict[str, set[str]] = {}
for stack, hosts in discovered.items():
# Use first host for multi-host stacks (they use the same images)
host = hosts[0] if isinstance(hosts, list) else hosts
stacks_by_host.setdefault(host, set()).add(stack)
results = run_parallel_with_progress(
"Capturing",
stacks,
collect_stack,
)
# Collect entries with 1 SSH call per host (with progress bar)
async def collect_on_host(host: str) -> tuple[str, list[SnapshotEntry]]:
entries = await collect_stacks_entries_on_host(cfg, host, stacks_by_host[host], now=now_dt)
return host, entries
results = run_parallel_with_progress("Capturing", list(stacks_by_host.keys()), collect_on_host)
snapshot_entries = [entry for _, entries in results for entry in entries]
if not snapshot_entries:
@@ -155,39 +155,20 @@ def _discover_stacks_full(
) -> tuple[dict[str, str | list[str]], dict[str, list[str]], dict[str, list[str]]]:
"""Discover running stacks with full host scanning for stray detection.
Returns:
Tuple of (discovered, strays, duplicates):
- discovered: stack -> host(s) where running correctly
- strays: stack -> list of unauthorized hosts
- duplicates: stack -> list of all hosts (for single-host stacks on multiple)
Queries each host once for all running stacks (with progress bar),
then delegates to build_discovery_results for categorization.
"""
stack_list = stacks if stacks is not None else list(cfg.stacks)
results: list[StackDiscoveryResult] = run_parallel_with_progress(
"Discovering",
stack_list,
lambda s: discover_stack_on_all_hosts(cfg, s),
)
all_hosts = list(cfg.hosts.keys())
discovered: dict[str, str | list[str]] = {}
strays: dict[str, list[str]] = {}
duplicates: dict[str, list[str]] = {}
# Query each host for running stacks (with progress bar)
async def get_stacks_on_host(host: str) -> tuple[str, set[str]]:
running = await get_running_stacks_on_host(cfg, host)
return host, running
for result in results:
correct_hosts = [h for h in result.running_hosts if h in result.configured_hosts]
if correct_hosts:
if result.is_multi_host:
discovered[result.stack] = correct_hosts
else:
discovered[result.stack] = correct_hosts[0]
host_results = run_parallel_with_progress("Discovering", all_hosts, get_stacks_on_host)
running_on_host: dict[str, set[str]] = dict(host_results)
if result.is_stray:
strays[result.stack] = result.stray_hosts
if result.is_duplicate:
duplicates[result.stack] = result.running_hosts
return discovered, strays, duplicates
return build_discovery_results(cfg, running_on_host, stacks)
def _report_stray_stacks(
@@ -554,10 +535,10 @@ def refresh(
save_state(cfg, new_state)
print_success(f"State updated: {len(new_state)} stacks tracked.")
# Capture image digests for running stacks
# Capture image digests for running stacks (1 SSH call per host)
if discovered:
try:
path = _snapshot_stacks(cfg, list(discovered.keys()), log_path)
path = _snapshot_stacks(cfg, discovered, log_path)
print_success(f"Digests written to {path}")
except RuntimeError as exc:
print_warning(str(exc))

View File

@@ -15,17 +15,6 @@ from .paths import config_search_paths, find_config_path
COMPOSE_FILENAMES = ("compose.yaml", "compose.yml", "docker-compose.yml", "docker-compose.yaml")
def discover_compose_dirs(compose_dir: Path) -> list[str]:
"""Find all directories in compose_dir that contain a compose file."""
if not compose_dir.exists():
return []
return sorted(
subdir.name
for subdir in compose_dir.iterdir()
if subdir.is_dir() and any((subdir / f).exists() for f in COMPOSE_FILENAMES)
)
class Host(BaseModel, extra="forbid"):
"""SSH host configuration."""
@@ -116,7 +105,13 @@ class Config(BaseModel, extra="forbid"):
def discover_compose_dirs(self) -> set[str]:
"""Find all directories in compose_dir that contain a compose file."""
return set(discover_compose_dirs(self.compose_dir))
found: set[str] = set()
if not self.compose_dir.exists():
return found
for subdir in self.compose_dir.iterdir():
if subdir.is_dir() and any((subdir / f).exists() for f in COMPOSE_FILENAMES):
found.add(subdir.name)
return found
def _parse_hosts(raw_hosts: dict[str, Any]) -> dict[str, Host]:

View File

@@ -1,41 +0,0 @@
"""Example stack templates for compose-farm."""
from __future__ import annotations
from importlib import resources
from pathlib import Path
# All available examples: name -> description
# "full" is special: multi-stack setup with config file
EXAMPLES = {
"whoami": "Simple HTTP service that returns hostname (great for testing Traefik)",
"nginx": "Basic nginx web server with static files",
"postgres": "PostgreSQL database with persistent volume",
"full": "Complete setup with Traefik + whoami (includes compose-farm.yaml)",
}
# Examples that are single stacks (everything except "full")
SINGLE_STACK_EXAMPLES = {k: v for k, v in EXAMPLES.items() if k != "full"}
def list_example_files(name: str) -> list[tuple[str, str]]:
"""List files in an example template, returning (relative_path, content) tuples."""
if name not in EXAMPLES:
msg = f"Unknown example: {name}. Available: {', '.join(EXAMPLES.keys())}"
raise ValueError(msg)
example_dir = resources.files("compose_farm.examples") / name
example_path = Path(str(example_dir))
files: list[tuple[str, str]] = []
def walk_dir(directory: Path, prefix: str = "") -> None:
for item in sorted(directory.iterdir()):
rel_path = f"{prefix}{item.name}" if prefix else item.name
if item.is_file():
content = item.read_text(encoding="utf-8")
files.append((rel_path, content))
elif item.is_dir() and not item.name.startswith("__"):
walk_dir(item, f"{rel_path}/")
walk_dir(example_path)
return files

View File

@@ -1,41 +0,0 @@
# Compose Farm Full Example
A complete starter setup with Traefik reverse proxy and a test service.
## Quick Start
1. **Create the Docker network** (once per host):
```bash
docker network create --subnet=172.20.0.0/16 --gateway=172.20.0.1 mynetwork
```
2. **Create data directory for Traefik**:
```bash
mkdir -p /mnt/data/traefik
```
3. **Edit configuration**:
- Update `compose-farm.yaml` with your host IP
- Update `.env` files with your domain
4. **Start the stacks**:
```bash
cf up traefik whoami
```
5. **Test**:
- Dashboard: http://localhost:8080
- Whoami: Add `whoami.example.com` to /etc/hosts pointing to your host
## Files
```
full/
├── compose-farm.yaml # Compose Farm config
├── traefik/
│ ├── compose.yaml # Traefik reverse proxy
│ └── .env
└── whoami/
├── compose.yaml # Test HTTP service
└── .env
```

View File

@@ -1,17 +0,0 @@
# Compose Farm configuration
# Edit the host address to match your setup
compose_dir: .
hosts:
local: localhost # For remote hosts, use: myhost: 192.168.1.100
stacks:
traefik: local
whoami: local
nginx: local
postgres: local
# Traefik file-provider integration (optional)
# traefik_file: ./traefik/dynamic.d/compose-farm.yml
traefik_stack: traefik

View File

@@ -1,2 +0,0 @@
# Environment variables for nginx stack
DOMAIN=example.com

View File

@@ -1,26 +0,0 @@
# Nginx - Basic web server
services:
nginx:
image: nginx:alpine
container_name: cf-nginx
user: "1000:1000"
networks:
- mynetwork
volumes:
- /mnt/data/nginx/html:/usr/share/nginx/html:ro
ports:
- "9082:80" # Use 80:80 or 8080:80 in production
restart: unless-stopped
labels:
- traefik.enable=true
- traefik.http.routers.nginx.rule=Host(`nginx.${DOMAIN}`)
- traefik.http.routers.nginx.entrypoints=websecure
- traefik.http.routers.nginx-local.rule=Host(`nginx.local`)
- traefik.http.routers.nginx-local.entrypoints=web
- traefik.http.services.nginx.loadbalancer.server.port=80
- kuma.nginx.http.name=Nginx
- kuma.nginx.http.url=https://nginx.${DOMAIN}
networks:
mynetwork:
external: true

View File

@@ -1,5 +0,0 @@
# Environment variables for postgres stack
# IMPORTANT: Change these values before deploying!
POSTGRES_USER=postgres
POSTGRES_PASSWORD=changeme
POSTGRES_DB=myapp

View File

@@ -1,26 +0,0 @@
# PostgreSQL - Database with persistent storage
services:
postgres:
image: postgres:16-alpine
container_name: cf-postgres
networks:
- mynetwork
environment:
- POSTGRES_USER=${POSTGRES_USER:-postgres}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:?POSTGRES_PASSWORD is required}
- POSTGRES_DB=${POSTGRES_DB:-postgres}
- PGDATA=/var/lib/postgresql/data/pgdata
volumes:
- /mnt/data/postgres:/var/lib/postgresql/data
ports:
- "5433:5432" # Use 5432:5432 in production
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-postgres}"]
interval: 30s
timeout: 10s
retries: 3
networks:
mynetwork:
external: true

View File

@@ -1 +0,0 @@
DOMAIN=example.com

View File

@@ -1,37 +0,0 @@
# Traefik - Reverse proxy and load balancer
services:
traefik:
image: traefik:v2.11
container_name: cf-traefik
networks:
- mynetwork
ports:
- "9080:80" # HTTP (use 80:80 in production)
- "9443:443" # HTTPS (use 443:443 in production)
- "9081:8080" # Dashboard - remove in production
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- /mnt/data/traefik:/etc/traefik
command:
- --api.dashboard=true
- --api.insecure=true
- --providers.docker=true
- --providers.docker.exposedbydefault=false
- --providers.docker.network=mynetwork
- --entrypoints.web.address=:80
- --entrypoints.websecure.address=:443
- --log.level=INFO
labels:
- traefik.enable=true
- traefik.http.routers.traefik.rule=Host(`traefik.${DOMAIN}`)
- traefik.http.routers.traefik.entrypoints=websecure
- traefik.http.routers.traefik-local.rule=Host(`traefik.local`)
- traefik.http.routers.traefik-local.entrypoints=web
- traefik.http.services.traefik.loadbalancer.server.port=8080
- kuma.traefik.http.name=Traefik
- kuma.traefik.http.url=https://traefik.${DOMAIN}
restart: unless-stopped
networks:
mynetwork:
external: true

View File

@@ -1 +0,0 @@
DOMAIN=example.com

View File

@@ -1,23 +0,0 @@
# Whoami - Test service for Traefik routing
services:
whoami:
image: traefik/whoami:latest
container_name: cf-whoami
networks:
- mynetwork
ports:
- "9000:80"
restart: unless-stopped
labels:
- traefik.enable=true
- traefik.http.routers.whoami.rule=Host(`whoami.${DOMAIN}`)
- traefik.http.routers.whoami.entrypoints=websecure
- traefik.http.routers.whoami-local.rule=Host(`whoami.local`)
- traefik.http.routers.whoami-local.entrypoints=web
- traefik.http.services.whoami.loadbalancer.server.port=80
- kuma.whoami.http.name=Whoami
- kuma.whoami.http.url=https://whoami.${DOMAIN}
networks:
mynetwork:
external: true

View File

@@ -1,2 +0,0 @@
# Environment variables for nginx stack
DOMAIN=example.com

View File

@@ -1,26 +0,0 @@
# Nginx - Basic web server
services:
nginx:
image: nginx:alpine
container_name: cf-nginx
user: "1000:1000"
networks:
- mynetwork
volumes:
- /mnt/data/nginx/html:/usr/share/nginx/html:ro
ports:
- "9082:80"
restart: unless-stopped
labels:
- traefik.enable=true
- traefik.http.routers.nginx.rule=Host(`nginx.${DOMAIN}`)
- traefik.http.routers.nginx.entrypoints=websecure
- traefik.http.routers.nginx-local.rule=Host(`nginx.local`)
- traefik.http.routers.nginx-local.entrypoints=web
- traefik.http.services.nginx.loadbalancer.server.port=80
- kuma.nginx.http.name=Nginx
- kuma.nginx.http.url=https://nginx.${DOMAIN}
networks:
mynetwork:
external: true

View File

@@ -1,5 +0,0 @@
# Environment variables for postgres stack
# IMPORTANT: Change these values before deploying!
POSTGRES_USER=postgres
POSTGRES_PASSWORD=changeme
POSTGRES_DB=myapp

View File

@@ -1,26 +0,0 @@
# PostgreSQL - Database with persistent storage
services:
postgres:
image: postgres:16-alpine
container_name: cf-postgres
networks:
- mynetwork
environment:
- POSTGRES_USER=${POSTGRES_USER:-postgres}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:?POSTGRES_PASSWORD is required}
- POSTGRES_DB=${POSTGRES_DB:-postgres}
- PGDATA=/var/lib/postgresql/data/pgdata
volumes:
- /mnt/data/postgres:/var/lib/postgresql/data
ports:
- "5433:5432"
restart: unless-stopped
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER:-postgres}"]
interval: 30s
timeout: 10s
retries: 3
networks:
mynetwork:
external: true

View File

@@ -1,2 +0,0 @@
# Environment variables for whoami stack
DOMAIN=example.com

View File

@@ -1,24 +0,0 @@
# Whoami - Simple HTTP service for testing
# Returns the container hostname - useful for testing load balancers and Traefik
services:
whoami:
image: traefik/whoami:latest
container_name: cf-whoami
networks:
- mynetwork
ports:
- "9000:80"
restart: unless-stopped
labels:
- traefik.enable=true
- traefik.http.routers.whoami.rule=Host(`whoami.${DOMAIN}`)
- traefik.http.routers.whoami.entrypoints=websecure
- traefik.http.routers.whoami-local.rule=Host(`whoami.local`)
- traefik.http.routers.whoami-local.entrypoints=web
- traefik.http.services.whoami.loadbalancer.server.port=80
- kuma.whoami.http.name=Whoami
- kuma.whoami.http.url=https://whoami.${DOMAIN}
networks:
mynetwork:
external: true

View File

@@ -497,6 +497,28 @@ async def check_stack_running(
return result.success and bool(result.stdout.strip())
async def get_running_stacks_on_host(
config: Config,
host_name: str,
) -> set[str]:
"""Get all running compose stacks on a host in a single SSH call.
Uses docker ps with the compose.project label to identify running stacks.
Much more efficient than checking each stack individually.
"""
host = config.hosts[host_name]
# Get unique project names from running containers
command = "docker ps --format '{{.Label \"com.docker.compose.project\"}}' | sort -u"
result = await run_command(host, command, stack=host_name, stream=False, prefix="")
if not result.success:
return set()
# Filter out empty lines and return as set
return {line.strip() for line in result.stdout.splitlines() if line.strip()}
async def _batch_check_existence(
config: Config,
host_name: str,

View File

@@ -6,21 +6,22 @@ import json
import tomllib
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from .executor import run_compose
from .executor import run_command
from .paths import xdg_config_home
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable
from collections.abc import Iterable
from pathlib import Path
from .config import Config
from .executor import CommandResult
# Separator used to split output sections
_SECTION_SEPARATOR = "---CF-SEP---"
DEFAULT_LOG_PATH = xdg_config_home() / "compose-farm" / "dockerfarm-log.toml"
_DIGEST_HEX_LENGTH = 64
@dataclass(frozen=True)
@@ -56,87 +57,97 @@ def _escape(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def _parse_images_output(raw: str) -> list[dict[str, Any]]:
"""Parse `docker compose images --format json` output.
Handles both a JSON array and newline-separated JSON objects for robustness.
"""
raw = raw.strip()
if not raw:
return []
def _parse_image_digests(image_json: str) -> dict[str, str]:
"""Parse docker image inspect JSON to build image tag -> digest map."""
if not image_json:
return {}
try:
parsed = json.loads(raw)
image_data = json.loads(image_json)
except json.JSONDecodeError:
objects = []
for line in raw.splitlines():
if not line.strip():
continue
objects.append(json.loads(line))
return objects
return {}
if isinstance(parsed, list):
return parsed
if isinstance(parsed, dict):
return [parsed]
return []
image_digests: dict[str, str] = {}
for img in image_data:
tags = img.get("RepoTags") or []
digests = img.get("RepoDigests") or []
digest = digests[0].split("@")[-1] if digests else img.get("Id", "")
for tag in tags:
image_digests[tag] = digest
if img.get("Id"):
image_digests[img["Id"]] = digest
return image_digests
def _extract_image_fields(record: dict[str, Any]) -> tuple[str, str]:
"""Extract image name and digest with fallbacks."""
image = record.get("Image") or record.get("Repository") or record.get("Name") or ""
tag = record.get("Tag") or record.get("Version")
if tag and ":" not in image.rsplit("/", 1)[-1]:
image = f"{image}:{tag}"
digest = (
record.get("Digest")
or record.get("Image ID")
or record.get("ImageID")
or record.get("ID")
or ""
)
if digest and not digest.startswith("sha256:") and len(digest) == _DIGEST_HEX_LENGTH:
digest = f"sha256:{digest}"
return image, digest
async def collect_stack_entries(
async def collect_stacks_entries_on_host(
config: Config,
stack: str,
host_name: str,
stacks: set[str],
*,
now: datetime,
run_compose_fn: Callable[..., Awaitable[CommandResult]] = run_compose,
) -> list[SnapshotEntry]:
"""Run `docker compose images` for a stack and normalize results."""
result = await run_compose_fn(config, stack, "images --format json", stream=False)
"""Collect image entries for stacks on one host using 2 docker commands.
Uses `docker ps` to get running containers + their compose project labels,
then `docker image inspect` to get digests for all unique images.
Much faster than running N `docker compose images` commands.
"""
if not stacks:
return []
host = config.hosts[host_name]
# Single SSH call with 2 docker commands:
# 1. Get project|image pairs from running containers
# 2. Get image info (including digests) for all unique images
command = (
f"docker ps --format '{{{{.Label \"com.docker.compose.project\"}}}}|{{{{.Image}}}}' && "
f"echo '{_SECTION_SEPARATOR}' && "
"docker image inspect $(docker ps --format '{{.Image}}' | sort -u) 2>/dev/null || true"
)
result = await run_command(host, command, host_name, stream=False, prefix="")
if not result.success:
msg = result.stderr or f"compose images exited with {result.exit_code}"
error = f"[{stack}] Unable to read images: {msg}"
raise RuntimeError(error)
return []
records = _parse_images_output(result.stdout)
# Use first host for snapshots (multi-host stacks use same images on all hosts)
host_name = config.get_hosts(stack)[0]
compose_path = config.get_compose_path(stack)
# Split output into two sections
parts = result.stdout.split(_SECTION_SEPARATOR)
if len(parts) != 2: # noqa: PLR2004
return []
entries: list[SnapshotEntry] = []
for record in records:
image, digest = _extract_image_fields(record)
if not digest:
container_lines, image_json = parts[0].strip(), parts[1].strip()
# Parse project|image pairs, filtering to only stacks we care about
stack_images: dict[str, set[str]] = {}
for line in container_lines.splitlines():
if "|" not in line:
continue
entries.append(
SnapshotEntry(
stack=stack,
host=host_name,
compose_file=compose_path,
image=image,
digest=digest,
captured_at=now,
)
)
project, image = line.split("|", 1)
if project in stacks:
stack_images.setdefault(project, set()).add(image)
if not stack_images:
return []
# Parse image inspect JSON to build image -> digest map
image_digests = _parse_image_digests(image_json)
# Build entries
entries: list[SnapshotEntry] = []
for stack, images in stack_images.items():
for image in images:
digest = image_digests.get(image, "")
if digest:
entries.append(
SnapshotEntry(
stack=stack,
host=host_name,
compose_file=config.get_compose_path(stack),
image=image,
digest=digest,
captured_at=now,
)
)
return entries

View File

@@ -76,31 +76,6 @@ def get_stack_paths(cfg: Config, stack: str) -> list[str]:
return paths
async def discover_stack_host(cfg: Config, stack: str) -> tuple[str, str | list[str] | None]:
"""Discover where a stack is running.
For multi-host stacks, checks all assigned hosts in parallel.
For single-host, checks assigned host first, then others.
Returns (stack_name, host_or_hosts_or_none).
"""
assigned_hosts = cfg.get_hosts(stack)
if cfg.is_multi_host(stack):
# Check all assigned hosts in parallel
checks = await asyncio.gather(*[check_stack_running(cfg, stack, h) for h in assigned_hosts])
running = [h for h, is_running in zip(assigned_hosts, checks, strict=True) if is_running]
return stack, running if running else None
# Single-host: check assigned host first, then others
if await check_stack_running(cfg, stack, assigned_hosts[0]):
return stack, assigned_hosts[0]
for host in cfg.hosts:
if host != assigned_hosts[0] and await check_stack_running(cfg, stack, host):
return stack, host
return stack, None
class StackDiscoveryResult(NamedTuple):
"""Result of discovering where a stack is running across all hosts."""
@@ -134,25 +109,6 @@ class StackDiscoveryResult(NamedTuple):
return not self.is_multi_host and len(self.running_hosts) > 1
async def discover_stack_on_all_hosts(cfg: Config, stack: str) -> StackDiscoveryResult:
"""Discover where a stack is running across ALL hosts.
Unlike discover_stack_host(), this checks every host in parallel
to detect strays and duplicates.
"""
configured_hosts = cfg.get_hosts(stack)
all_hosts = list(cfg.hosts.keys())
checks = await asyncio.gather(*[check_stack_running(cfg, stack, h) for h in all_hosts])
running_hosts = [h for h, is_running in zip(all_hosts, checks, strict=True) if is_running]
return StackDiscoveryResult(
stack=stack,
configured_hosts=configured_hosts,
running_hosts=running_hosts,
)
async def check_stack_requirements(
cfg: Config,
stack: str,
@@ -518,3 +474,60 @@ async def stop_stray_stacks(
"""
return await _stop_stacks_on_hosts(cfg, strays, label="stray")
def build_discovery_results(
cfg: Config,
running_on_host: dict[str, set[str]],
stacks: list[str] | None = None,
) -> tuple[dict[str, str | list[str]], dict[str, list[str]], dict[str, list[str]]]:
"""Build discovery results from per-host running stacks.
Takes the raw data of which stacks are running on which hosts and
categorizes them into discovered (running correctly), strays (wrong host),
and duplicates (single-host stack on multiple hosts).
Args:
cfg: Config object.
running_on_host: Dict mapping host -> set of running stack names.
stacks: Optional list of stacks to check. Defaults to all configured stacks.
Returns:
Tuple of (discovered, strays, duplicates):
- discovered: stack -> host(s) where running correctly
- strays: stack -> list of unauthorized hosts
- duplicates: stack -> list of all hosts (for single-host stacks on multiple)
"""
stack_list = stacks if stacks is not None else list(cfg.stacks)
all_hosts = list(running_on_host.keys())
# Build StackDiscoveryResult for each stack
results: list[StackDiscoveryResult] = [
StackDiscoveryResult(
stack=stack,
configured_hosts=cfg.get_hosts(stack),
running_hosts=[h for h in all_hosts if stack in running_on_host[h]],
)
for stack in stack_list
]
discovered: dict[str, str | list[str]] = {}
strays: dict[str, list[str]] = {}
duplicates: dict[str, list[str]] = {}
for result in results:
correct_hosts = [h for h in result.running_hosts if h in result.configured_hosts]
if correct_hosts:
if result.is_multi_host:
discovered[result.stack] = correct_hosts
else:
discovered[result.stack] = correct_hosts[0]
if result.is_stray:
strays[result.stack] = result.stray_hosts
if result.is_duplicate:
duplicates[result.stack] = result.running_hosts
return discovered, strays, duplicates

View File

@@ -38,7 +38,17 @@ def get_templates() -> Jinja2Templates:
def extract_config_error(exc: Exception) -> str:
"""Extract a user-friendly error message from a config exception."""
if isinstance(exc, ValidationError):
return "; ".join(err.get("msg", str(err)) for err in exc.errors())
parts = []
for err in exc.errors():
msg = err.get("msg", str(err))
loc = err.get("loc", ())
if loc:
# Format location as dot-separated path (e.g., "hosts.nas.port")
loc_str = ".".join(str(part) for part in loc)
parts.append(f"{loc_str}: {msg}")
else:
parts.append(msg)
return "; ".join(parts)
return str(exc)

View File

@@ -228,99 +228,3 @@ class TestConfigValidate:
# Error goes to stderr
output = result.stdout + (result.stderr or "")
assert "Config file not found" in output or "not found" in output.lower()
class TestConfigExample:
"""Tests for cf config example command."""
def test_example_list(self, runner: CliRunner) -> None:
result = runner.invoke(app, ["config", "example", "--list"])
assert result.exit_code == 0
assert "whoami" in result.stdout
assert "nginx" in result.stdout
assert "postgres" in result.stdout
assert "full" in result.stdout
def test_example_whoami(self, runner: CliRunner, tmp_path: Path) -> None:
result = runner.invoke(app, ["config", "example", "whoami", "-o", str(tmp_path)])
assert result.exit_code == 0
assert "Example 'whoami' created" in result.stdout
assert (tmp_path / "whoami" / "compose.yaml").exists()
assert (tmp_path / "whoami" / ".env").exists()
def test_example_full(self, runner: CliRunner, tmp_path: Path) -> None:
result = runner.invoke(app, ["config", "example", "full", "-o", str(tmp_path)])
assert result.exit_code == 0
assert "Example 'full' created" in result.stdout
assert (tmp_path / "compose-farm.yaml").exists()
assert (tmp_path / "traefik" / "compose.yaml").exists()
assert (tmp_path / "whoami" / "compose.yaml").exists()
assert (tmp_path / "nginx" / "compose.yaml").exists()
assert (tmp_path / "postgres" / "compose.yaml").exists()
def test_example_unknown(self, runner: CliRunner, tmp_path: Path) -> None:
result = runner.invoke(app, ["config", "example", "unknown", "-o", str(tmp_path)])
assert result.exit_code == 1
output = result.stdout + (result.stderr or "")
assert "Unknown example" in output
def test_example_force_overwrites(self, runner: CliRunner, tmp_path: Path) -> None:
# Create first time
runner.invoke(app, ["config", "example", "whoami", "-o", str(tmp_path)])
# Overwrite with force
result = runner.invoke(app, ["config", "example", "whoami", "-o", str(tmp_path), "-f"])
assert result.exit_code == 0
def test_example_prompts_on_existing(self, runner: CliRunner, tmp_path: Path) -> None:
# Create first time
runner.invoke(app, ["config", "example", "whoami", "-o", str(tmp_path)])
# Try again without force, decline
result = runner.invoke(
app, ["config", "example", "whoami", "-o", str(tmp_path)], input="n\n"
)
assert result.exit_code == 0
assert "Aborted" in result.stdout
class TestExamplesModule:
"""Tests for the examples module."""
def test_list_example_files_whoami(self) -> None:
from compose_farm.examples import list_example_files
files = list_example_files("whoami")
file_names = [f for f, _ in files]
assert ".env" in file_names
assert "compose.yaml" in file_names
def test_list_example_files_full(self) -> None:
from compose_farm.examples import list_example_files
files = list_example_files("full")
file_names = [f for f, _ in files]
assert "compose-farm.yaml" in file_names
assert "traefik/compose.yaml" in file_names
assert "whoami/compose.yaml" in file_names
def test_list_example_files_unknown(self) -> None:
from compose_farm.examples import list_example_files
with pytest.raises(ValueError, match="Unknown example"):
list_example_files("unknown")
def test_examples_dict(self) -> None:
from compose_farm.examples import EXAMPLES, SINGLE_STACK_EXAMPLES
assert "whoami" in EXAMPLES
assert "full" in EXAMPLES
assert "full" not in SINGLE_STACK_EXAMPLES
assert "whoami" in SINGLE_STACK_EXAMPLES
class TestConfigInitDiscover:
"""Tests for cf config init --discover."""
def test_discover_option_exists(self, runner: CliRunner) -> None:
result = runner.invoke(app, ["config", "init", "--help"])
assert "--discover" in result.stdout
assert "-d" in result.stdout

View File

@@ -11,6 +11,7 @@ from compose_farm.executor import (
_run_local_command,
check_networks_exist,
check_paths_exist,
get_running_stacks_on_host,
is_local,
run_command,
run_compose,
@@ -239,3 +240,31 @@ class TestCheckNetworksExist:
result = await check_networks_exist(config, "local", [])
assert result == {}
@linux_only
class TestGetRunningStacksOnHost:
"""Tests for get_running_stacks_on_host function (requires Docker)."""
async def test_returns_set_of_stacks(self, tmp_path: Path) -> None:
"""Function returns a set of stack names."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
stacks={},
)
result = await get_running_stacks_on_host(config, "local")
assert isinstance(result, set)
async def test_filters_empty_lines(self, tmp_path: Path) -> None:
"""Empty project names are filtered out."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
stacks={},
)
# Result should not contain empty strings
result = await get_running_stacks_on_host(config, "local")
assert "" not in result

View File

@@ -10,8 +10,8 @@ import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.logs import (
_parse_images_output,
collect_stack_entries,
_SECTION_SEPARATOR,
collect_stacks_entries_on_host,
isoformat,
load_existing_entries,
merge_entries,
@@ -19,74 +19,252 @@ from compose_farm.logs import (
)
def test_parse_images_output_handles_list_and_lines() -> None:
data = [
{"Service": "svc", "Image": "redis", "Digest": "sha256:abc"},
{"Service": "svc", "Image": "db", "Digest": "sha256:def"},
def _make_mock_output(
project_images: dict[str, list[str]], image_info: list[dict[str, object]]
) -> str:
"""Build mock output matching the 2-docker-command format."""
# Section 1: project|image pairs from docker ps
ps_lines = [
f"{project}|{image}" for project, images in project_images.items() for image in images
]
as_array = _parse_images_output(json.dumps(data))
assert len(as_array) == 2
as_lines = _parse_images_output("\n".join(json.dumps(item) for item in data))
assert len(as_lines) == 2
# Section 2: JSON array from docker image inspect
image_json = json.dumps(image_info)
return f"{chr(10).join(ps_lines)}\n{_SECTION_SEPARATOR}\n{image_json}"
@pytest.mark.asyncio
async def test_snapshot_preserves_first_seen(tmp_path: Path) -> None:
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
stack_dir = compose_dir / "svc"
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
class TestCollectStacksEntriesOnHost:
"""Tests for collect_stacks_entries_on_host (2 docker commands per host)."""
config = Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
stacks={"svc": "local"},
)
@pytest.fixture
def config_with_stacks(self, tmp_path: Path) -> Config:
"""Create a config with multiple stacks."""
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
for stack in ["plex", "jellyfin", "sonarr"]:
stack_dir = compose_dir / stack
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
sample_output = json.dumps([{"Service": "svc", "Image": "redis", "Digest": "sha256:abc"}])
async def fake_run_compose(
_cfg: Config, stack: str, compose_cmd: str, *, stream: bool = True
) -> CommandResult:
assert compose_cmd == "images --format json"
assert stream is False or stream is True
return CommandResult(
stack=stack,
exit_code=0,
success=True,
stdout=sample_output,
stderr="",
return Config(
compose_dir=compose_dir,
hosts={"host1": Host(address="localhost"), "host2": Host(address="localhost")},
stacks={"plex": "host1", "jellyfin": "host1", "sonarr": "host2"},
)
log_path = tmp_path / "dockerfarm-log.toml"
@pytest.mark.asyncio
async def test_single_ssh_call(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Verify only 1 SSH call is made regardless of stack count."""
call_count = {"count": 0}
# First snapshot
first_time = datetime(2025, 1, 1, tzinfo=UTC)
first_entries = await collect_stack_entries(
config, "svc", now=first_time, run_compose_fn=fake_run_compose
)
first_iso = isoformat(first_time)
merged = merge_entries([], first_entries, now_iso=first_iso)
meta = {"generated_at": first_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
call_count["count"] += 1
output = _make_mock_output(
{"plex": ["plex:latest"], "jellyfin": ["jellyfin:latest"]},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["plex@sha256:aaa"],
},
{
"RepoTags": ["jellyfin:latest"],
"Id": "sha256:bbb",
"RepoDigests": ["jellyfin@sha256:bbb"],
},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
after_first = tomllib.loads(log_path.read_text())
first_seen = after_first["entries"][0]["first_seen"]
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
# Second snapshot
second_time = datetime(2025, 2, 1, tzinfo=UTC)
second_entries = await collect_stack_entries(
config, "svc", now=second_time, run_compose_fn=fake_run_compose
)
second_iso = isoformat(second_time)
existing = load_existing_entries(log_path)
merged = merge_entries(existing, second_entries, now_iso=second_iso)
meta = {"generated_at": second_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex", "jellyfin"}, now=now
)
after_second = tomllib.loads(log_path.read_text())
entry = after_second["entries"][0]
assert entry["first_seen"] == first_seen
assert entry["last_seen"].startswith("2025-02-01")
assert call_count["count"] == 1
assert len(entries) == 2
@pytest.mark.asyncio
async def test_filters_to_requested_stacks(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Only return entries for stacks we asked for, even if others are running."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
# Docker ps shows 3 stacks, but we only want plex
output = _make_mock_output(
{
"plex": ["plex:latest"],
"jellyfin": ["jellyfin:latest"],
"other": ["other:latest"],
},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["plex@sha256:aaa"],
},
{
"RepoTags": ["jellyfin:latest"],
"Id": "sha256:bbb",
"RepoDigests": ["j@sha256:bbb"],
},
{
"RepoTags": ["other:latest"],
"Id": "sha256:ccc",
"RepoDigests": ["o@sha256:ccc"],
},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert len(entries) == 1
assert entries[0].stack == "plex"
@pytest.mark.asyncio
async def test_multiple_images_per_stack(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Stack with multiple containers/images returns multiple entries."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
output = _make_mock_output(
{"plex": ["plex:latest", "redis:7"]},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["p@sha256:aaa"],
},
{"RepoTags": ["redis:7"], "Id": "sha256:bbb", "RepoDigests": ["r@sha256:bbb"]},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert len(entries) == 2
images = {e.image for e in entries}
assert images == {"plex:latest", "redis:7"}
@pytest.mark.asyncio
async def test_empty_stacks_returns_empty(self, config_with_stacks: Config) -> None:
"""Empty stack set returns empty entries without making SSH call."""
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(config_with_stacks, "host1", set(), now=now)
assert entries == []
@pytest.mark.asyncio
async def test_ssh_failure_returns_empty(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""SSH failure returns empty list instead of raising."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
return CommandResult(stack=stack, exit_code=1, success=False, stdout="", stderr="error")
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert entries == []
class TestSnapshotMerging:
"""Tests for merge_entries preserving first_seen."""
@pytest.fixture
def config(self, tmp_path: Path) -> Config:
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
stack_dir = compose_dir / "svc"
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
return Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
stacks={"svc": "local"},
)
@pytest.mark.asyncio
async def test_preserves_first_seen(
self, tmp_path: Path, config: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Repeated snapshots preserve first_seen timestamp."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
output = _make_mock_output(
{"svc": ["redis:latest"]},
[
{
"RepoTags": ["redis:latest"],
"Id": "sha256:abc",
"RepoDigests": ["r@sha256:abc"],
}
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
log_path = tmp_path / "dockerfarm-log.toml"
# First snapshot
first_time = datetime(2025, 1, 1, tzinfo=UTC)
first_entries = await collect_stacks_entries_on_host(
config, "local", {"svc"}, now=first_time
)
first_iso = isoformat(first_time)
merged = merge_entries([], first_entries, now_iso=first_iso)
meta = {"generated_at": first_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
after_first = tomllib.loads(log_path.read_text())
first_seen = after_first["entries"][0]["first_seen"]
# Second snapshot
second_time = datetime(2025, 2, 1, tzinfo=UTC)
second_entries = await collect_stacks_entries_on_host(
config, "local", {"svc"}, now=second_time
)
second_iso = isoformat(second_time)
existing = load_existing_entries(log_path)
merged = merge_entries(existing, second_entries, now_iso=second_iso)
meta = {"generated_at": second_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
after_second = tomllib.loads(log_path.read_text())
entry = after_second["entries"][0]
assert entry["first_seen"] == first_seen
assert entry["last_seen"].startswith("2025-02-01")

View File

@@ -11,7 +11,10 @@ import pytest
from compose_farm.cli import lifecycle
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.operations import _migrate_stack
from compose_farm.operations import (
_migrate_stack,
build_discovery_results,
)
@pytest.fixture
@@ -109,3 +112,83 @@ class TestUpdateCommandSequence:
# Verify the sequence is pull, build, down, up
assert "down" in source
assert "up -d" in source
class TestBuildDiscoveryResults:
"""Tests for build_discovery_results function."""
@pytest.fixture
def config(self, tmp_path: Path) -> Config:
"""Create a test config with multiple stacks."""
compose_dir = tmp_path / "compose"
for stack in ["plex", "jellyfin", "sonarr"]:
(compose_dir / stack).mkdir(parents=True)
(compose_dir / stack / "docker-compose.yml").write_text("services: {}")
return Config(
compose_dir=compose_dir,
hosts={
"host1": Host(address="localhost"),
"host2": Host(address="localhost"),
},
stacks={"plex": "host1", "jellyfin": "host1", "sonarr": "host2"},
)
def test_discovers_correctly_running_stacks(self, config: Config) -> None:
"""Stacks running on correct hosts are discovered."""
running_on_host = {
"host1": {"plex", "jellyfin"},
"host2": {"sonarr"},
}
discovered, strays, duplicates = build_discovery_results(config, running_on_host)
assert discovered == {"plex": "host1", "jellyfin": "host1", "sonarr": "host2"}
assert strays == {}
assert duplicates == {}
def test_detects_stray_stacks(self, config: Config) -> None:
"""Stacks running on wrong hosts are marked as strays."""
running_on_host = {
"host1": set(),
"host2": {"plex"}, # plex should be on host1
}
discovered, strays, _duplicates = build_discovery_results(config, running_on_host)
assert "plex" not in discovered
assert strays == {"plex": ["host2"]}
def test_detects_duplicates(self, config: Config) -> None:
"""Single-host stacks running on multiple hosts are duplicates."""
running_on_host = {
"host1": {"plex"},
"host2": {"plex"}, # plex running on both hosts
}
discovered, strays, duplicates = build_discovery_results(
config, running_on_host, stacks=["plex"]
)
# plex is correctly running on host1
assert discovered == {"plex": "host1"}
# plex is also a stray on host2
assert strays == {"plex": ["host2"]}
# plex is a duplicate (single-host stack on multiple hosts)
assert duplicates == {"plex": ["host1", "host2"]}
def test_filters_to_requested_stacks(self, config: Config) -> None:
"""Only returns results for requested stacks."""
running_on_host = {
"host1": {"plex", "jellyfin"},
"host2": {"sonarr"},
}
discovered, _strays, _duplicates = build_discovery_results(
config, running_on_host, stacks=["plex"]
)
# Only plex should be in results
assert discovered == {"plex": "host1"}
assert "jellyfin" not in discovered
assert "sonarr" not in discovered

View File

@@ -7,11 +7,58 @@ from typing import TYPE_CHECKING
import pytest
from fastapi import HTTPException
from pydantic import ValidationError
if TYPE_CHECKING:
from compose_farm.config import Config
class TestExtractConfigError:
"""Tests for extract_config_error helper."""
def test_validation_error_with_location(self) -> None:
from compose_farm.config import Config, Host
from compose_farm.web.deps import extract_config_error
# Trigger a validation error with an extra field
with pytest.raises(ValidationError) as exc_info:
Config(
hosts={"server": Host(address="192.168.1.1")},
stacks={"app": "server"},
unknown_field="bad", # type: ignore[call-arg]
)
msg = extract_config_error(exc_info.value)
assert "unknown_field" in msg
assert "Extra inputs are not permitted" in msg
def test_validation_error_nested_location(self) -> None:
from compose_farm.config import Host
from compose_farm.web.deps import extract_config_error
# Trigger a validation error with a nested extra field
with pytest.raises(ValidationError) as exc_info:
Host(address="192.168.1.1", bad_key="value") # type: ignore[call-arg]
msg = extract_config_error(exc_info.value)
assert "bad_key" in msg
assert "Extra inputs are not permitted" in msg
def test_regular_exception(self) -> None:
from compose_farm.web.deps import extract_config_error
exc = ValueError("Something went wrong")
msg = extract_config_error(exc)
assert msg == "Something went wrong"
def test_file_not_found_exception(self) -> None:
from compose_farm.web.deps import extract_config_error
exc = FileNotFoundError("Config file not found")
msg = extract_config_error(exc)
assert msg == "Config file not found"
class TestValidateYaml:
"""Tests for _validate_yaml helper."""