Compare commits

...

4 Commits

Author SHA1 Message Date
Bas Nijholt
2a923e6e81 fix: Include field name in config validation error messages (#131)
Previously, Pydantic validation errors like "Extra inputs are not
permitted" didn't show which field caused the error. Now the error
message includes the field location (e.g., "unknown_key: Extra inputs
are not permitted").
2025-12-22 22:35:19 -08:00
Bas Nijholt
5f2e081298 perf: Batch snapshot collection to 1 SSH call per host (#130)
## Summary

Optimize `cf refresh` SSH calls from O(stacks) to O(hosts):
- Discovery: 1 SSH call per host (unchanged)
- Snapshots: 1 SSH call per host (was 1 per stack)

For 50 stacks across 4 hosts: 54 → 8 SSH calls.

## Changes

**Performance:**
- Use `docker ps` + `docker image inspect` instead of `docker compose images` per stack
- Batch snapshot collection by host in `collect_stacks_entries_on_host()`

**Architecture:**
- Add `build_discovery_results()` to `operations.py` (business logic)
- Keep progress bar wrapper in `cli/management.py` (presentation)
- Remove dead code: `discover_all_stacks_on_all_hosts()`, `collect_all_stacks_entries()`
2025-12-22 22:19:32 -08:00
Bas Nijholt
6fbc7430cb perf: Optimize stray detection to use 1 SSH call per host (#129)
* perf: Optimize stray detection to use 1 SSH call per host

Previously, stray detection checked each stack on each host individually,
resulting in (stacks * hosts) SSH calls. For 50 stacks across 4 hosts,
this meant ~200 parallel SSH connections, causing "Connection lost" errors.

Now queries each host once for all running compose projects using:
  docker ps --format '{{.Label "com.docker.compose.project"}}' | sort -u

This reduces SSH calls from ~200 to just 4 (one per host).

Changes:
- Add get_running_stacks_on_host() in executor.py
- Add discover_all_stacks_on_all_hosts() in operations.py
- Update _discover_stacks_full() to use the batch approach

* Remove unused function and add tests

- Remove discover_stack_on_all_hosts() which is no longer used
- Add tests for get_running_stacks_on_host()
- Add tests for discover_all_stacks_on_all_hosts()
  - Verifies it returns correct StackDiscoveryResult
  - Verifies stray detection works
  - Verifies it makes only 1 call per host (not per stack)
2025-12-22 12:09:59 -08:00
Bas Nijholt
6fdb43e1e9 Add self-healing: detect and stop stray containers (#128)
* Add self-healing: detect and stop rogue containers

Adds the ability to detect and stop "rogue" containers - stacks running
on hosts they shouldn't be according to config.

Changes:
- `cf refresh`: Now scans ALL hosts and warns about rogues/duplicates
- `cf apply`: Stops rogue containers before migrations (new phase)
- New `--no-rogues` flag to skip rogue detection

Implementation:
- Add StackDiscoveryResult for full host scanning results
- Add discover_stack_on_all_hosts() to check all hosts in parallel
- Add stop_rogue_stacks() to stop containers on unauthorized hosts
- Update tests to include new no_rogues parameter

* Update README.md

* fix: Update refresh tests for _discover_stacks_full return type

The function now returns a tuple (discovered, rogues, duplicates)
for rogue/duplicate detection. Update test mocks accordingly.

* Rename "rogue" terminology to "stray" for consistency

Terminology update across the codebase:
- rogue_hosts -> stray_hosts
- is_rogue -> is_stray
- stop_rogue_stacks -> stop_stray_stacks
- _discover_rogues -> _discover_strays
- --no-rogues -> --no-strays
- _report_rogue_stacks -> _report_stray_stacks

"Stray" better complements "orphaned" (both evoke lost things)
while clearly indicating the stack is running somewhere it
shouldn't be.

* Update README.md

* Move asyncio import to top level

* Fix remaining rogue -> stray in docstrings and README

* Refactor: Extract shared helpers to reduce duplication

1. Extract _stop_stacks_on_hosts helper in operations.py
   - Shared by stop_orphaned_stacks and stop_stray_stacks
   - Reduces ~50 lines of duplicated code

2. Refactor _discover_strays to reuse _discover_stacks_full
   - Removes duplicate discovery logic from lifecycle.py
   - Calls management._discover_stacks_full and merges duplicates

* Add PR review prompt

* Fix typos in PR review prompt

* Move import to top level (no in-function imports)

* Update README.md

* Remove obvious comments
2025-12-22 10:22:09 -08:00
15 changed files with 898 additions and 263 deletions

View File

@@ -21,7 +21,7 @@ repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.9
hooks:
- id: ruff
- id: ruff-check
args: [--fix]
- id: ruff-format

15
.prompts/pr-review.md Normal file
View File

@@ -0,0 +1,15 @@
Review the pull request for:
- **Code cleanliness**: Is the implementation clean and well-structured?
- **DRY principle**: Does it avoid duplication?
- **Code reuse**: Are there parts that should be reused from other places?
- **Organization**: Is everything in the right place?
- **Consistency**: Is it in the same style as other parts of the codebase?
- **Simplicity**: Is it not over-engineered? Remember KISS and YAGNI. No dead code paths and NO defensive programming.
- **User experience**: Does it provide a good user experience?
- **PR**: Is the PR description and title clear and informative?
- **Tests**: Are there tests, and do they cover the changes adequately? Are they testing something meaningful or are they just trivial?
- **Live tests**: Test the changes in a REAL live environment to ensure they work as expected, use the config in `/opt/stacks/compose-farm.yaml`.
- **Rules**: Does the code follow the project's coding standards and guidelines as laid out in @CLAUDE.md?
Look at `git diff origin/main..HEAD` for the changes made in this pull request.

View File

@@ -449,6 +449,15 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
│ copy it or customize the installation. │
│ --help -h Show this message and exit. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Configuration ──────────────────────────────────────────────────────────────╮
│ traefik-file Generate a Traefik file-provider fragment from compose │
│ Traefik labels. │
│ refresh Update local state from running stacks. │
│ check Validate configuration, traefik labels, mounts, and networks. │
│ init-network Create Docker network on hosts with consistent settings. │
│ config Manage compose-farm configuration files. │
│ ssh Manage SSH keys for passwordless authentication. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Lifecycle ──────────────────────────────────────────────────────────────────╮
│ up Start stacks (docker compose up -d). Auto-migrates if host │
│ changed. │
@@ -460,18 +469,10 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
│ that service. │
│ update Update stacks (pull + build + down + up). With --service, │
│ updates just that service. │
│ apply Make reality match config (start, migrate, stop as needed).
│ apply Make reality match config (start, migrate, stop
│ strays/orphans as needed). │
│ compose Run any docker compose command on a stack. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Configuration ──────────────────────────────────────────────────────────────╮
│ traefik-file Generate a Traefik file-provider fragment from compose │
│ Traefik labels. │
│ refresh Update local state from running stacks. │
│ check Validate configuration, traefik labels, mounts, and networks. │
│ init-network Create Docker network on hosts with consistent settings. │
│ config Manage compose-farm configuration files. │
│ ssh Manage SSH keys for passwordless authentication. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Monitoring ─────────────────────────────────────────────────────────────────╮
│ logs Show stack logs. With --service, shows logs for just that │
│ service. │
@@ -721,22 +722,25 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
Usage: cf apply [OPTIONS]
Make reality match config (start, migrate, stop as needed).
Make reality match config (start, migrate, stop strays/orphans as needed).
This is the "reconcile" command that ensures running stacks match your
config file. It will:
1. Stop orphaned stacks (in state but removed from config)
2. Migrate stacks on wrong host (host in state ≠ host in config)
3. Start missing stacks (in config but not in state)
2. Stop stray stacks (running on unauthorized hosts)
3. Migrate stacks on wrong host (host in state ≠ host in config)
4. Start missing stacks (in config but not in state)
Use --dry-run to preview changes before applying.
Use --no-orphans to only migrate/start without stopping orphaned stacks.
Use --no-orphans to skip stopping orphaned stacks.
Use --no-strays to skip stopping stray stacks.
Use --full to also run 'up' on all stacks (picks up compose/env changes).
╭─ Options ────────────────────────────────────────────────────────────────────╮
│ --dry-run -n Show what would change without executing │
│ --no-orphans Only migrate, don't stop orphaned stacks │
│ --no-strays Don't stop stray stacks (running on wrong host) │
│ --full -f Also run up on all stacks to apply config │
│ changes │
│ --config -c PATH Path to config file │

View File

@@ -3,10 +3,13 @@
from __future__ import annotations
from pathlib import Path
from typing import Annotated
from typing import TYPE_CHECKING, Annotated
import typer
if TYPE_CHECKING:
from compose_farm.config import Config
from compose_farm.cli.app import app
from compose_farm.cli.common import (
AllOption,
@@ -23,9 +26,14 @@ from compose_farm.cli.common import (
validate_host_for_stack,
validate_stacks,
)
from compose_farm.cli.management import _discover_stacks_full
from compose_farm.console import MSG_DRY_RUN, console, print_error, print_success
from compose_farm.executor import run_compose_on_host, run_on_stacks, run_sequential_on_stacks
from compose_farm.operations import stop_orphaned_stacks, up_stacks
from compose_farm.operations import (
stop_orphaned_stacks,
stop_stray_stacks,
up_stacks,
)
from compose_farm.state import (
get_orphaned_stacks,
get_stack_host,
@@ -208,8 +216,23 @@ def update(
report_results(results)
def _discover_strays(cfg: Config) -> dict[str, list[str]]:
"""Discover stacks running on unauthorized hosts by scanning all hosts."""
_, strays, duplicates = _discover_stacks_full(cfg)
# Merge duplicates into strays (for single-host stacks on multiple hosts,
# keep correct host and stop others)
for stack, running_hosts in duplicates.items():
configured = cfg.get_hosts(stack)[0]
stray_hosts = [h for h in running_hosts if h != configured]
if stray_hosts:
strays[stack] = stray_hosts
return strays
@app.command(rich_help_panel="Lifecycle")
def apply( # noqa: PLR0912 (multi-phase reconciliation needs these branches)
def apply( # noqa: C901, PLR0912, PLR0915 (multi-phase reconciliation needs these branches)
dry_run: Annotated[
bool,
typer.Option("--dry-run", "-n", help="Show what would change without executing"),
@@ -218,23 +241,29 @@ def apply( # noqa: PLR0912 (multi-phase reconciliation needs these branches)
bool,
typer.Option("--no-orphans", help="Only migrate, don't stop orphaned stacks"),
] = False,
no_strays: Annotated[
bool,
typer.Option("--no-strays", help="Don't stop stray stacks (running on wrong host)"),
] = False,
full: Annotated[
bool,
typer.Option("--full", "-f", help="Also run up on all stacks to apply config changes"),
] = False,
config: ConfigOption = None,
) -> None:
"""Make reality match config (start, migrate, stop as needed).
"""Make reality match config (start, migrate, stop strays/orphans as needed).
This is the "reconcile" command that ensures running stacks match your
config file. It will:
1. Stop orphaned stacks (in state but removed from config)
2. Migrate stacks on wrong host (host in state ≠ host in config)
3. Start missing stacks (in config but not in state)
2. Stop stray stacks (running on unauthorized hosts)
3. Migrate stacks on wrong host (host in state ≠ host in config)
4. Start missing stacks (in config but not in state)
Use --dry-run to preview changes before applying.
Use --no-orphans to only migrate/start without stopping orphaned stacks.
Use --no-orphans to skip stopping orphaned stacks.
Use --no-strays to skip stopping stray stacks.
Use --full to also run 'up' on all stacks (picks up compose/env changes).
"""
cfg = load_config_or_exit(config)
@@ -242,16 +271,28 @@ def apply( # noqa: PLR0912 (multi-phase reconciliation needs these branches)
migrations = get_stacks_needing_migration(cfg)
missing = get_stacks_not_in_state(cfg)
strays: dict[str, list[str]] = {}
if not no_strays:
console.print("[dim]Scanning hosts for stray containers...[/]")
strays = _discover_strays(cfg)
# For --full: refresh all stacks not already being started/migrated
handled = set(migrations) | set(missing)
to_refresh = [stack for stack in cfg.stacks if stack not in handled] if full else []
has_orphans = bool(orphaned) and not no_orphans
has_strays = bool(strays)
has_migrations = bool(migrations)
has_missing = bool(missing)
has_refresh = bool(to_refresh)
if not has_orphans and not has_migrations and not has_missing and not has_refresh:
if (
not has_orphans
and not has_strays
and not has_migrations
and not has_missing
and not has_refresh
):
print_success("Nothing to apply - reality matches config")
return
@@ -260,6 +301,14 @@ def apply( # noqa: PLR0912 (multi-phase reconciliation needs these branches)
console.print(f"[yellow]Orphaned stacks to stop ({len(orphaned)}):[/]")
for svc, hosts in orphaned.items():
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(hosts)}[/]")
if has_strays:
console.print(f"[red]Stray stacks to stop ({len(strays)}):[/]")
for stack, hosts in strays.items():
configured = cfg.get_hosts(stack)
console.print(
f" [cyan]{stack}[/] on [magenta]{', '.join(hosts)}[/] "
f"[dim](should be on {', '.join(configured)})[/]"
)
if has_migrations:
console.print(f"[cyan]Stacks to migrate ({len(migrations)}):[/]")
for stack in migrations:
@@ -288,21 +337,26 @@ def apply( # noqa: PLR0912 (multi-phase reconciliation needs these branches)
console.print("[yellow]Stopping orphaned stacks...[/]")
all_results.extend(run_async(stop_orphaned_stacks(cfg)))
# 2. Migrate stacks on wrong host
# 2. Stop stray stacks (running on unauthorized hosts)
if has_strays:
console.print("[red]Stopping stray stacks...[/]")
all_results.extend(run_async(stop_stray_stacks(cfg, strays)))
# 3. Migrate stacks on wrong host
if has_migrations:
console.print("[cyan]Migrating stacks...[/]")
migrate_results = run_async(up_stacks(cfg, migrations, raw=True))
all_results.extend(migrate_results)
maybe_regenerate_traefik(cfg, migrate_results)
# 3. Start missing stacks (reuse up_stacks which handles state updates)
# 4. Start missing stacks (reuse up_stacks which handles state updates)
if has_missing:
console.print("[green]Starting missing stacks...[/]")
start_results = run_async(up_stacks(cfg, missing, raw=True))
all_results.extend(start_results)
maybe_regenerate_traefik(cfg, start_results)
# 4. Refresh remaining stacks (--full: run up to apply config changes)
# 5. Refresh remaining stacks (--full: run up to apply config changes)
if has_refresh:
console.print("[blue]Refreshing stacks...[/]")
refresh_results = run_async(up_stacks(cfg, to_refresh, raw=True))

View File

@@ -37,22 +37,23 @@ from compose_farm.console import (
)
from compose_farm.executor import (
CommandResult,
get_running_stacks_on_host,
is_local,
run_command,
)
from compose_farm.logs import (
DEFAULT_LOG_PATH,
SnapshotEntry,
collect_stack_entries,
collect_stacks_entries_on_host,
isoformat,
load_existing_entries,
merge_entries,
write_toml,
)
from compose_farm.operations import (
build_discovery_results,
check_host_compatibility,
check_stack_requirements,
discover_stack_host,
)
from compose_farm.state import get_orphaned_stacks, load_state, save_state
from compose_farm.traefik import generate_traefik_config, render_traefik_config
@@ -60,38 +61,39 @@ from compose_farm.traefik import generate_traefik_config, render_traefik_config
# --- Sync helpers ---
def _discover_stacks(cfg: Config, stacks: list[str] | None = None) -> dict[str, str | list[str]]:
"""Discover running stacks with a progress bar."""
stack_list = stacks if stacks is not None else list(cfg.stacks)
results = run_parallel_with_progress(
"Discovering",
stack_list,
lambda s: discover_stack_host(cfg, s),
)
return {svc: host for svc, host in results if host is not None}
def _snapshot_stacks(
cfg: Config,
stacks: list[str],
discovered: dict[str, str | list[str]],
log_path: Path | None,
) -> Path:
"""Capture image digests with a progress bar."""
"""Capture image digests using batched SSH calls (1 per host).
Args:
cfg: Configuration
discovered: Dict mapping stack -> host(s) where it's running
log_path: Optional path to write the log file
Returns:
Path to the written log file.
"""
effective_log_path = log_path or DEFAULT_LOG_PATH
now_dt = datetime.now(UTC)
now_iso = isoformat(now_dt)
async def collect_stack(stack: str) -> tuple[str, list[SnapshotEntry]]:
try:
return stack, await collect_stack_entries(cfg, stack, now=now_dt)
except RuntimeError:
return stack, []
# Group stacks by host for batched SSH calls
stacks_by_host: dict[str, set[str]] = {}
for stack, hosts in discovered.items():
# Use first host for multi-host stacks (they use the same images)
host = hosts[0] if isinstance(hosts, list) else hosts
stacks_by_host.setdefault(host, set()).add(stack)
results = run_parallel_with_progress(
"Capturing",
stacks,
collect_stack,
)
# Collect entries with 1 SSH call per host (with progress bar)
async def collect_on_host(host: str) -> tuple[str, list[SnapshotEntry]]:
entries = await collect_stacks_entries_on_host(cfg, host, stacks_by_host[host], now=now_dt)
return host, entries
results = run_parallel_with_progress("Capturing", list(stacks_by_host.keys()), collect_on_host)
snapshot_entries = [entry for _, entries in results for entry in entries]
if not snapshot_entries:
@@ -147,6 +149,61 @@ def _report_sync_changes(
console.print(f" [red]-[/] [cyan]{stack}[/] (was on [magenta]{host_str}[/])")
def _discover_stacks_full(
cfg: Config,
stacks: list[str] | None = None,
) -> tuple[dict[str, str | list[str]], dict[str, list[str]], dict[str, list[str]]]:
"""Discover running stacks with full host scanning for stray detection.
Queries each host once for all running stacks (with progress bar),
then delegates to build_discovery_results for categorization.
"""
all_hosts = list(cfg.hosts.keys())
# Query each host for running stacks (with progress bar)
async def get_stacks_on_host(host: str) -> tuple[str, set[str]]:
running = await get_running_stacks_on_host(cfg, host)
return host, running
host_results = run_parallel_with_progress("Discovering", all_hosts, get_stacks_on_host)
running_on_host: dict[str, set[str]] = dict(host_results)
return build_discovery_results(cfg, running_on_host, stacks)
def _report_stray_stacks(
strays: dict[str, list[str]],
cfg: Config,
) -> None:
"""Report stacks running on unauthorized hosts."""
if strays:
console.print(f"\n[red]Stray stacks[/] (running on wrong host, {len(strays)}):")
console.print("[dim]Run [bold]cf apply[/bold] to stop them.[/]")
for stack in sorted(strays):
stray_hosts = strays[stack]
configured = cfg.get_hosts(stack)
console.print(
f" [red]![/] [cyan]{stack}[/] on [magenta]{', '.join(stray_hosts)}[/] "
f"[dim](should be on {', '.join(configured)})[/]"
)
def _report_duplicate_stacks(duplicates: dict[str, list[str]], cfg: Config) -> None:
"""Report single-host stacks running on multiple hosts."""
if duplicates:
console.print(
f"\n[yellow]Duplicate stacks[/] (running on multiple hosts, {len(duplicates)}):"
)
console.print("[dim]Run [bold]cf apply[/bold] to stop extras.[/]")
for stack in sorted(duplicates):
hosts = duplicates[stack]
configured = cfg.get_hosts(stack)[0]
console.print(
f" [yellow]![/] [cyan]{stack}[/] on [magenta]{', '.join(hosts)}[/] "
f"[dim](should only be on {configured})[/]"
)
# --- Check helpers ---
@@ -440,7 +497,7 @@ def refresh(
current_state = load_state(cfg)
discovered = _discover_stacks(cfg, stack_list)
discovered, strays, duplicates = _discover_stacks_full(cfg, stack_list)
# Calculate changes (only for the stacks we're refreshing)
added = [s for s in discovered if s not in current_state]
@@ -463,6 +520,9 @@ def refresh(
else:
print_success("State is already in sync.")
_report_stray_stacks(strays, cfg)
_report_duplicate_stacks(duplicates, cfg)
if dry_run:
console.print(f"\n{MSG_DRY_RUN}")
return
@@ -475,10 +535,10 @@ def refresh(
save_state(cfg, new_state)
print_success(f"State updated: {len(new_state)} stacks tracked.")
# Capture image digests for running stacks
# Capture image digests for running stacks (1 SSH call per host)
if discovered:
try:
path = _snapshot_stacks(cfg, list(discovered.keys()), log_path)
path = _snapshot_stacks(cfg, discovered, log_path)
print_success(f"Digests written to {path}")
except RuntimeError as exc:
print_warning(str(exc))

View File

@@ -497,6 +497,28 @@ async def check_stack_running(
return result.success and bool(result.stdout.strip())
async def get_running_stacks_on_host(
config: Config,
host_name: str,
) -> set[str]:
"""Get all running compose stacks on a host in a single SSH call.
Uses docker ps with the compose.project label to identify running stacks.
Much more efficient than checking each stack individually.
"""
host = config.hosts[host_name]
# Get unique project names from running containers
command = "docker ps --format '{{.Label \"com.docker.compose.project\"}}' | sort -u"
result = await run_command(host, command, stack=host_name, stream=False, prefix="")
if not result.success:
return set()
# Filter out empty lines and return as set
return {line.strip() for line in result.stdout.splitlines() if line.strip()}
async def _batch_check_existence(
config: Config,
host_name: str,

View File

@@ -6,21 +6,22 @@ import json
import tomllib
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from .executor import run_compose
from .executor import run_command
from .paths import xdg_config_home
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable
from collections.abc import Iterable
from pathlib import Path
from .config import Config
from .executor import CommandResult
# Separator used to split output sections
_SECTION_SEPARATOR = "---CF-SEP---"
DEFAULT_LOG_PATH = xdg_config_home() / "compose-farm" / "dockerfarm-log.toml"
_DIGEST_HEX_LENGTH = 64
@dataclass(frozen=True)
@@ -56,87 +57,97 @@ def _escape(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def _parse_images_output(raw: str) -> list[dict[str, Any]]:
"""Parse `docker compose images --format json` output.
Handles both a JSON array and newline-separated JSON objects for robustness.
"""
raw = raw.strip()
if not raw:
return []
def _parse_image_digests(image_json: str) -> dict[str, str]:
"""Parse docker image inspect JSON to build image tag -> digest map."""
if not image_json:
return {}
try:
parsed = json.loads(raw)
image_data = json.loads(image_json)
except json.JSONDecodeError:
objects = []
for line in raw.splitlines():
if not line.strip():
continue
objects.append(json.loads(line))
return objects
return {}
if isinstance(parsed, list):
return parsed
if isinstance(parsed, dict):
return [parsed]
return []
image_digests: dict[str, str] = {}
for img in image_data:
tags = img.get("RepoTags") or []
digests = img.get("RepoDigests") or []
digest = digests[0].split("@")[-1] if digests else img.get("Id", "")
for tag in tags:
image_digests[tag] = digest
if img.get("Id"):
image_digests[img["Id"]] = digest
return image_digests
def _extract_image_fields(record: dict[str, Any]) -> tuple[str, str]:
"""Extract image name and digest with fallbacks."""
image = record.get("Image") or record.get("Repository") or record.get("Name") or ""
tag = record.get("Tag") or record.get("Version")
if tag and ":" not in image.rsplit("/", 1)[-1]:
image = f"{image}:{tag}"
digest = (
record.get("Digest")
or record.get("Image ID")
or record.get("ImageID")
or record.get("ID")
or ""
)
if digest and not digest.startswith("sha256:") and len(digest) == _DIGEST_HEX_LENGTH:
digest = f"sha256:{digest}"
return image, digest
async def collect_stack_entries(
async def collect_stacks_entries_on_host(
config: Config,
stack: str,
host_name: str,
stacks: set[str],
*,
now: datetime,
run_compose_fn: Callable[..., Awaitable[CommandResult]] = run_compose,
) -> list[SnapshotEntry]:
"""Run `docker compose images` for a stack and normalize results."""
result = await run_compose_fn(config, stack, "images --format json", stream=False)
"""Collect image entries for stacks on one host using 2 docker commands.
Uses `docker ps` to get running containers + their compose project labels,
then `docker image inspect` to get digests for all unique images.
Much faster than running N `docker compose images` commands.
"""
if not stacks:
return []
host = config.hosts[host_name]
# Single SSH call with 2 docker commands:
# 1. Get project|image pairs from running containers
# 2. Get image info (including digests) for all unique images
command = (
f"docker ps --format '{{{{.Label \"com.docker.compose.project\"}}}}|{{{{.Image}}}}' && "
f"echo '{_SECTION_SEPARATOR}' && "
"docker image inspect $(docker ps --format '{{.Image}}' | sort -u) 2>/dev/null || true"
)
result = await run_command(host, command, host_name, stream=False, prefix="")
if not result.success:
msg = result.stderr or f"compose images exited with {result.exit_code}"
error = f"[{stack}] Unable to read images: {msg}"
raise RuntimeError(error)
return []
records = _parse_images_output(result.stdout)
# Use first host for snapshots (multi-host stacks use same images on all hosts)
host_name = config.get_hosts(stack)[0]
compose_path = config.get_compose_path(stack)
# Split output into two sections
parts = result.stdout.split(_SECTION_SEPARATOR)
if len(parts) != 2: # noqa: PLR2004
return []
entries: list[SnapshotEntry] = []
for record in records:
image, digest = _extract_image_fields(record)
if not digest:
container_lines, image_json = parts[0].strip(), parts[1].strip()
# Parse project|image pairs, filtering to only stacks we care about
stack_images: dict[str, set[str]] = {}
for line in container_lines.splitlines():
if "|" not in line:
continue
entries.append(
SnapshotEntry(
stack=stack,
host=host_name,
compose_file=compose_path,
image=image,
digest=digest,
captured_at=now,
)
)
project, image = line.split("|", 1)
if project in stacks:
stack_images.setdefault(project, set()).add(image)
if not stack_images:
return []
# Parse image inspect JSON to build image -> digest map
image_digests = _parse_image_digests(image_json)
# Build entries
entries: list[SnapshotEntry] = []
for stack, images in stack_images.items():
for image in images:
digest = image_digests.get(image, "")
if digest:
entries.append(
SnapshotEntry(
stack=stack,
host=host_name,
compose_file=config.get_compose_path(stack),
image=image,
digest=digest,
captured_at=now,
)
)
return entries

View File

@@ -76,29 +76,37 @@ def get_stack_paths(cfg: Config, stack: str) -> list[str]:
return paths
async def discover_stack_host(cfg: Config, stack: str) -> tuple[str, str | list[str] | None]:
"""Discover where a stack is running.
class StackDiscoveryResult(NamedTuple):
"""Result of discovering where a stack is running across all hosts."""
For multi-host stacks, checks all assigned hosts in parallel.
For single-host, checks assigned host first, then others.
stack: str
configured_hosts: list[str] # From config (where it SHOULD run)
running_hosts: list[str] # From reality (where it IS running)
Returns (stack_name, host_or_hosts_or_none).
"""
assigned_hosts = cfg.get_hosts(stack)
@property
def is_multi_host(self) -> bool:
"""Check if this is a multi-host stack."""
return len(self.configured_hosts) > 1
if cfg.is_multi_host(stack):
# Check all assigned hosts in parallel
checks = await asyncio.gather(*[check_stack_running(cfg, stack, h) for h in assigned_hosts])
running = [h for h, is_running in zip(assigned_hosts, checks, strict=True) if is_running]
return stack, running if running else None
@property
def stray_hosts(self) -> list[str]:
"""Hosts where stack is running but shouldn't be."""
return [h for h in self.running_hosts if h not in self.configured_hosts]
# Single-host: check assigned host first, then others
if await check_stack_running(cfg, stack, assigned_hosts[0]):
return stack, assigned_hosts[0]
for host in cfg.hosts:
if host != assigned_hosts[0] and await check_stack_running(cfg, stack, host):
return stack, host
return stack, None
@property
def missing_hosts(self) -> list[str]:
"""Hosts where stack should be running but isn't."""
return [h for h in self.configured_hosts if h not in self.running_hosts]
@property
def is_stray(self) -> bool:
"""Stack is running on unauthorized host(s)."""
return len(self.stray_hosts) > 0
@property
def is_duplicate(self) -> bool:
"""Single-host stack running on multiple hosts."""
return not self.is_multi_host and len(self.running_hosts) > 1
async def check_stack_requirements(
@@ -359,26 +367,33 @@ async def check_host_compatibility(
return results
async def stop_orphaned_stacks(cfg: Config) -> list[CommandResult]:
"""Stop orphaned stacks (in state but not in config).
async def _stop_stacks_on_hosts(
cfg: Config,
stacks_to_hosts: dict[str, list[str]],
label: str = "",
) -> list[CommandResult]:
"""Stop stacks on specific hosts.
Runs docker compose down on each stack on its tracked host(s).
Only removes from state on successful stop.
Shared helper for stop_orphaned_stacks and stop_stray_stacks.
Args:
cfg: Config object.
stacks_to_hosts: Dict mapping stack name to list of hosts to stop on.
label: Optional label for success message (e.g., "stray", "orphaned").
Returns:
List of CommandResults for each stack@host.
Returns list of CommandResults for each stack@host.
"""
orphaned = get_orphaned_stacks(cfg)
if not orphaned:
if not stacks_to_hosts:
return []
results: list[CommandResult] = []
tasks: list[tuple[str, str, asyncio.Task[CommandResult]]] = []
suffix = f" ({label})" if label else ""
# Build list of (stack, host, task) for all orphaned stacks
for stack, hosts in orphaned.items():
host_list = hosts if isinstance(hosts, list) else [hosts]
for host in host_list:
# Skip hosts no longer in config
for stack, hosts in stacks_to_hosts.items():
for host in hosts:
if host not in cfg.hosts:
print_warning(f"{stack}@{host}: host no longer in config, skipping")
results.append(
@@ -393,30 +408,48 @@ async def stop_orphaned_stacks(cfg: Config) -> list[CommandResult]:
coro = run_compose_on_host(cfg, stack, host, "down")
tasks.append((stack, host, asyncio.create_task(coro)))
# Run all down commands in parallel
if tasks:
for stack, host, task in tasks:
try:
result = await task
results.append(result)
if result.success:
print_success(f"{stack}@{host}: stopped")
else:
print_error(f"{stack}@{host}: {result.stderr or 'failed'}")
except Exception as e:
print_error(f"{stack}@{host}: {e}")
results.append(
CommandResult(
stack=f"{stack}@{host}",
exit_code=1,
success=False,
stderr=str(e),
)
for stack, host, task in tasks:
try:
result = await task
results.append(result)
if result.success:
print_success(f"{stack}@{host}: stopped{suffix}")
else:
print_error(f"{stack}@{host}: {result.stderr or 'failed'}")
except Exception as e:
print_error(f"{stack}@{host}: {e}")
results.append(
CommandResult(
stack=f"{stack}@{host}",
exit_code=1,
success=False,
stderr=str(e),
)
)
return results
async def stop_orphaned_stacks(cfg: Config) -> list[CommandResult]:
"""Stop orphaned stacks (in state but not in config).
Runs docker compose down on each stack on its tracked host(s).
Only removes from state on successful stop.
Returns list of CommandResults for each stack@host.
"""
orphaned = get_orphaned_stacks(cfg)
if not orphaned:
return []
normalized: dict[str, list[str]] = {
stack: (hosts if isinstance(hosts, list) else [hosts]) for stack, hosts in orphaned.items()
}
results = await _stop_stacks_on_hosts(cfg, normalized)
# Remove from state only for stacks where ALL hosts succeeded
for stack, hosts in orphaned.items():
host_list = hosts if isinstance(hosts, list) else [hosts]
for stack in normalized:
all_succeeded = all(
r.success for r in results if r.stack.startswith(f"{stack}@") or r.stack == stack
)
@@ -424,3 +457,77 @@ async def stop_orphaned_stacks(cfg: Config) -> list[CommandResult]:
remove_stack(cfg, stack)
return results
async def stop_stray_stacks(
cfg: Config,
strays: dict[str, list[str]],
) -> list[CommandResult]:
"""Stop stacks running on unauthorized hosts.
Args:
cfg: Config object.
strays: Dict mapping stack name to list of stray hosts.
Returns:
List of CommandResults for each stack@host stopped.
"""
return await _stop_stacks_on_hosts(cfg, strays, label="stray")
def build_discovery_results(
cfg: Config,
running_on_host: dict[str, set[str]],
stacks: list[str] | None = None,
) -> tuple[dict[str, str | list[str]], dict[str, list[str]], dict[str, list[str]]]:
"""Build discovery results from per-host running stacks.
Takes the raw data of which stacks are running on which hosts and
categorizes them into discovered (running correctly), strays (wrong host),
and duplicates (single-host stack on multiple hosts).
Args:
cfg: Config object.
running_on_host: Dict mapping host -> set of running stack names.
stacks: Optional list of stacks to check. Defaults to all configured stacks.
Returns:
Tuple of (discovered, strays, duplicates):
- discovered: stack -> host(s) where running correctly
- strays: stack -> list of unauthorized hosts
- duplicates: stack -> list of all hosts (for single-host stacks on multiple)
"""
stack_list = stacks if stacks is not None else list(cfg.stacks)
all_hosts = list(running_on_host.keys())
# Build StackDiscoveryResult for each stack
results: list[StackDiscoveryResult] = [
StackDiscoveryResult(
stack=stack,
configured_hosts=cfg.get_hosts(stack),
running_hosts=[h for h in all_hosts if stack in running_on_host[h]],
)
for stack in stack_list
]
discovered: dict[str, str | list[str]] = {}
strays: dict[str, list[str]] = {}
duplicates: dict[str, list[str]] = {}
for result in results:
correct_hosts = [h for h in result.running_hosts if h in result.configured_hosts]
if correct_hosts:
if result.is_multi_host:
discovered[result.stack] = correct_hosts
else:
discovered[result.stack] = correct_hosts[0]
if result.is_stray:
strays[result.stack] = result.stray_hosts
if result.is_duplicate:
duplicates[result.stack] = result.running_hosts
return discovered, strays, duplicates

View File

@@ -38,7 +38,17 @@ def get_templates() -> Jinja2Templates:
def extract_config_error(exc: Exception) -> str:
"""Extract a user-friendly error message from a config exception."""
if isinstance(exc, ValidationError):
return "; ".join(err.get("msg", str(err)) for err in exc.errors())
parts = []
for err in exc.errors():
msg = err.get("msg", str(err))
loc = err.get("loc", ())
if loc:
# Format location as dot-separated path (e.g., "hosts.nas.port")
loc_str = ".".join(str(part) for part in loc)
parts.append(f"{loc_str}: {msg}")
else:
parts.append(msg)
return "; ".join(parts)
return str(exc)

View File

@@ -58,8 +58,9 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.get_orphaned_stacks", return_value={}),
patch("compose_farm.cli.lifecycle.get_stacks_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_stacks_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
):
apply(dry_run=False, no_orphans=False, full=False, config=None)
apply(dry_run=False, no_orphans=False, no_strays=False, full=False, config=None)
captured = capsys.readouterr()
assert "Nothing to apply" in captured.out
@@ -82,10 +83,11 @@ class TestApplyCommand:
),
patch("compose_farm.cli.lifecycle.get_stacks_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle.get_stack_host", return_value="host1"),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
patch("compose_farm.cli.lifecycle.stop_orphaned_stacks") as mock_stop,
patch("compose_farm.cli.lifecycle.up_stacks") as mock_up,
):
apply(dry_run=True, no_orphans=False, full=False, config=None)
apply(dry_run=True, no_orphans=False, no_strays=False, full=False, config=None)
captured = capsys.readouterr()
assert "Stacks to migrate" in captured.out
@@ -112,6 +114,7 @@ class TestApplyCommand:
),
patch("compose_farm.cli.lifecycle.get_stacks_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle.get_stack_host", return_value="host1"),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
@@ -120,7 +123,7 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=False, config=None)
apply(dry_run=False, no_orphans=False, no_strays=False, full=False, config=None)
mock_up.assert_called_once()
call_args = mock_up.call_args
@@ -139,6 +142,7 @@ class TestApplyCommand:
),
patch("compose_farm.cli.lifecycle.get_stacks_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_stacks_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
@@ -146,7 +150,7 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.stop_orphaned_stacks") as mock_stop,
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=False, config=None)
apply(dry_run=False, no_orphans=False, no_strays=False, full=False, config=None)
mock_stop.assert_called_once_with(cfg)
@@ -169,6 +173,7 @@ class TestApplyCommand:
),
patch("compose_farm.cli.lifecycle.get_stacks_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle.get_stack_host", return_value="host1"),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
@@ -178,7 +183,7 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=True, full=False, config=None)
apply(dry_run=False, no_orphans=True, no_strays=False, full=False, config=None)
# Should run migrations but not orphan cleanup
mock_up.assert_called_once()
@@ -202,8 +207,9 @@ class TestApplyCommand:
),
patch("compose_farm.cli.lifecycle.get_stacks_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_stacks_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
):
apply(dry_run=False, no_orphans=True, full=False, config=None)
apply(dry_run=False, no_orphans=True, no_strays=False, full=False, config=None)
captured = capsys.readouterr()
assert "Nothing to apply" in captured.out
@@ -221,6 +227,7 @@ class TestApplyCommand:
"compose_farm.cli.lifecycle.get_stacks_not_in_state",
return_value=["svc1"],
),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
@@ -229,7 +236,7 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=False, config=None)
apply(dry_run=False, no_orphans=False, no_strays=False, full=False, config=None)
mock_up.assert_called_once()
call_args = mock_up.call_args
@@ -249,8 +256,9 @@ class TestApplyCommand:
"compose_farm.cli.lifecycle.get_stacks_not_in_state",
return_value=["svc1"],
),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
):
apply(dry_run=True, no_orphans=False, full=False, config=None)
apply(dry_run=True, no_orphans=False, no_strays=False, full=False, config=None)
captured = capsys.readouterr()
assert "Stacks to start" in captured.out
@@ -267,6 +275,7 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.get_orphaned_stacks", return_value={}),
patch("compose_farm.cli.lifecycle.get_stacks_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_stacks_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
@@ -275,7 +284,7 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=True, config=None)
apply(dry_run=False, no_orphans=False, no_strays=False, full=True, config=None)
mock_up.assert_called_once()
call_args = mock_up.call_args
@@ -293,8 +302,9 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.get_orphaned_stacks", return_value={}),
patch("compose_farm.cli.lifecycle.get_stacks_needing_migration", return_value=[]),
patch("compose_farm.cli.lifecycle.get_stacks_not_in_state", return_value=[]),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
):
apply(dry_run=True, no_orphans=False, full=True, config=None)
apply(dry_run=True, no_orphans=False, no_strays=False, full=True, config=None)
captured = capsys.readouterr()
assert "Stacks to refresh" in captured.out
@@ -319,6 +329,7 @@ class TestApplyCommand:
return_value=["svc2"],
),
patch("compose_farm.cli.lifecycle.get_stack_host", return_value="host2"),
patch("compose_farm.cli.lifecycle._discover_strays", return_value={}),
patch(
"compose_farm.cli.lifecycle.run_async",
return_value=mock_results,
@@ -327,7 +338,7 @@ class TestApplyCommand:
patch("compose_farm.cli.lifecycle.maybe_regenerate_traefik"),
patch("compose_farm.cli.lifecycle.report_results"),
):
apply(dry_run=False, no_orphans=False, full=True, config=None)
apply(dry_run=False, no_orphans=False, no_strays=False, full=True, config=None)
# up_stacks should be called 3 times: migrate, start, refresh
assert mock_up.call_count == 3

View File

@@ -11,6 +11,7 @@ from compose_farm.executor import (
_run_local_command,
check_networks_exist,
check_paths_exist,
get_running_stacks_on_host,
is_local,
run_command,
run_compose,
@@ -239,3 +240,31 @@ class TestCheckNetworksExist:
result = await check_networks_exist(config, "local", [])
assert result == {}
@linux_only
class TestGetRunningStacksOnHost:
"""Tests for get_running_stacks_on_host function (requires Docker)."""
async def test_returns_set_of_stacks(self, tmp_path: Path) -> None:
"""Function returns a set of stack names."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
stacks={},
)
result = await get_running_stacks_on_host(config, "local")
assert isinstance(result, set)
async def test_filters_empty_lines(self, tmp_path: Path) -> None:
"""Empty project names are filtered out."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
stacks={},
)
# Result should not contain empty strings
result = await get_running_stacks_on_host(config, "local")
assert "" not in result

View File

@@ -10,8 +10,8 @@ import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.logs import (
_parse_images_output,
collect_stack_entries,
_SECTION_SEPARATOR,
collect_stacks_entries_on_host,
isoformat,
load_existing_entries,
merge_entries,
@@ -19,74 +19,252 @@ from compose_farm.logs import (
)
def test_parse_images_output_handles_list_and_lines() -> None:
data = [
{"Service": "svc", "Image": "redis", "Digest": "sha256:abc"},
{"Service": "svc", "Image": "db", "Digest": "sha256:def"},
def _make_mock_output(
project_images: dict[str, list[str]], image_info: list[dict[str, object]]
) -> str:
"""Build mock output matching the 2-docker-command format."""
# Section 1: project|image pairs from docker ps
ps_lines = [
f"{project}|{image}" for project, images in project_images.items() for image in images
]
as_array = _parse_images_output(json.dumps(data))
assert len(as_array) == 2
as_lines = _parse_images_output("\n".join(json.dumps(item) for item in data))
assert len(as_lines) == 2
# Section 2: JSON array from docker image inspect
image_json = json.dumps(image_info)
return f"{chr(10).join(ps_lines)}\n{_SECTION_SEPARATOR}\n{image_json}"
@pytest.mark.asyncio
async def test_snapshot_preserves_first_seen(tmp_path: Path) -> None:
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
stack_dir = compose_dir / "svc"
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
class TestCollectStacksEntriesOnHost:
"""Tests for collect_stacks_entries_on_host (2 docker commands per host)."""
config = Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
stacks={"svc": "local"},
)
@pytest.fixture
def config_with_stacks(self, tmp_path: Path) -> Config:
"""Create a config with multiple stacks."""
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
for stack in ["plex", "jellyfin", "sonarr"]:
stack_dir = compose_dir / stack
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
sample_output = json.dumps([{"Service": "svc", "Image": "redis", "Digest": "sha256:abc"}])
async def fake_run_compose(
_cfg: Config, stack: str, compose_cmd: str, *, stream: bool = True
) -> CommandResult:
assert compose_cmd == "images --format json"
assert stream is False or stream is True
return CommandResult(
stack=stack,
exit_code=0,
success=True,
stdout=sample_output,
stderr="",
return Config(
compose_dir=compose_dir,
hosts={"host1": Host(address="localhost"), "host2": Host(address="localhost")},
stacks={"plex": "host1", "jellyfin": "host1", "sonarr": "host2"},
)
log_path = tmp_path / "dockerfarm-log.toml"
@pytest.mark.asyncio
async def test_single_ssh_call(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Verify only 1 SSH call is made regardless of stack count."""
call_count = {"count": 0}
# First snapshot
first_time = datetime(2025, 1, 1, tzinfo=UTC)
first_entries = await collect_stack_entries(
config, "svc", now=first_time, run_compose_fn=fake_run_compose
)
first_iso = isoformat(first_time)
merged = merge_entries([], first_entries, now_iso=first_iso)
meta = {"generated_at": first_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
call_count["count"] += 1
output = _make_mock_output(
{"plex": ["plex:latest"], "jellyfin": ["jellyfin:latest"]},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["plex@sha256:aaa"],
},
{
"RepoTags": ["jellyfin:latest"],
"Id": "sha256:bbb",
"RepoDigests": ["jellyfin@sha256:bbb"],
},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
after_first = tomllib.loads(log_path.read_text())
first_seen = after_first["entries"][0]["first_seen"]
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
# Second snapshot
second_time = datetime(2025, 2, 1, tzinfo=UTC)
second_entries = await collect_stack_entries(
config, "svc", now=second_time, run_compose_fn=fake_run_compose
)
second_iso = isoformat(second_time)
existing = load_existing_entries(log_path)
merged = merge_entries(existing, second_entries, now_iso=second_iso)
meta = {"generated_at": second_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex", "jellyfin"}, now=now
)
after_second = tomllib.loads(log_path.read_text())
entry = after_second["entries"][0]
assert entry["first_seen"] == first_seen
assert entry["last_seen"].startswith("2025-02-01")
assert call_count["count"] == 1
assert len(entries) == 2
@pytest.mark.asyncio
async def test_filters_to_requested_stacks(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Only return entries for stacks we asked for, even if others are running."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
# Docker ps shows 3 stacks, but we only want plex
output = _make_mock_output(
{
"plex": ["plex:latest"],
"jellyfin": ["jellyfin:latest"],
"other": ["other:latest"],
},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["plex@sha256:aaa"],
},
{
"RepoTags": ["jellyfin:latest"],
"Id": "sha256:bbb",
"RepoDigests": ["j@sha256:bbb"],
},
{
"RepoTags": ["other:latest"],
"Id": "sha256:ccc",
"RepoDigests": ["o@sha256:ccc"],
},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert len(entries) == 1
assert entries[0].stack == "plex"
@pytest.mark.asyncio
async def test_multiple_images_per_stack(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Stack with multiple containers/images returns multiple entries."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
output = _make_mock_output(
{"plex": ["plex:latest", "redis:7"]},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["p@sha256:aaa"],
},
{"RepoTags": ["redis:7"], "Id": "sha256:bbb", "RepoDigests": ["r@sha256:bbb"]},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert len(entries) == 2
images = {e.image for e in entries}
assert images == {"plex:latest", "redis:7"}
@pytest.mark.asyncio
async def test_empty_stacks_returns_empty(self, config_with_stacks: Config) -> None:
"""Empty stack set returns empty entries without making SSH call."""
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(config_with_stacks, "host1", set(), now=now)
assert entries == []
@pytest.mark.asyncio
async def test_ssh_failure_returns_empty(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""SSH failure returns empty list instead of raising."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
return CommandResult(stack=stack, exit_code=1, success=False, stdout="", stderr="error")
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert entries == []
class TestSnapshotMerging:
"""Tests for merge_entries preserving first_seen."""
@pytest.fixture
def config(self, tmp_path: Path) -> Config:
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
stack_dir = compose_dir / "svc"
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
return Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
stacks={"svc": "local"},
)
@pytest.mark.asyncio
async def test_preserves_first_seen(
self, tmp_path: Path, config: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Repeated snapshots preserve first_seen timestamp."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
output = _make_mock_output(
{"svc": ["redis:latest"]},
[
{
"RepoTags": ["redis:latest"],
"Id": "sha256:abc",
"RepoDigests": ["r@sha256:abc"],
}
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
log_path = tmp_path / "dockerfarm-log.toml"
# First snapshot
first_time = datetime(2025, 1, 1, tzinfo=UTC)
first_entries = await collect_stacks_entries_on_host(
config, "local", {"svc"}, now=first_time
)
first_iso = isoformat(first_time)
merged = merge_entries([], first_entries, now_iso=first_iso)
meta = {"generated_at": first_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
after_first = tomllib.loads(log_path.read_text())
first_seen = after_first["entries"][0]["first_seen"]
# Second snapshot
second_time = datetime(2025, 2, 1, tzinfo=UTC)
second_entries = await collect_stacks_entries_on_host(
config, "local", {"svc"}, now=second_time
)
second_iso = isoformat(second_time)
existing = load_existing_entries(log_path)
merged = merge_entries(existing, second_entries, now_iso=second_iso)
meta = {"generated_at": second_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
after_second = tomllib.loads(log_path.read_text())
entry = after_second["entries"][0]
assert entry["first_seen"] == first_seen
assert entry["last_seen"].startswith("2025-02-01")

View File

@@ -11,7 +11,10 @@ import pytest
from compose_farm.cli import lifecycle
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.operations import _migrate_stack
from compose_farm.operations import (
_migrate_stack,
build_discovery_results,
)
@pytest.fixture
@@ -109,3 +112,83 @@ class TestUpdateCommandSequence:
# Verify the sequence is pull, build, down, up
assert "down" in source
assert "up -d" in source
class TestBuildDiscoveryResults:
"""Tests for build_discovery_results function."""
@pytest.fixture
def config(self, tmp_path: Path) -> Config:
"""Create a test config with multiple stacks."""
compose_dir = tmp_path / "compose"
for stack in ["plex", "jellyfin", "sonarr"]:
(compose_dir / stack).mkdir(parents=True)
(compose_dir / stack / "docker-compose.yml").write_text("services: {}")
return Config(
compose_dir=compose_dir,
hosts={
"host1": Host(address="localhost"),
"host2": Host(address="localhost"),
},
stacks={"plex": "host1", "jellyfin": "host1", "sonarr": "host2"},
)
def test_discovers_correctly_running_stacks(self, config: Config) -> None:
"""Stacks running on correct hosts are discovered."""
running_on_host = {
"host1": {"plex", "jellyfin"},
"host2": {"sonarr"},
}
discovered, strays, duplicates = build_discovery_results(config, running_on_host)
assert discovered == {"plex": "host1", "jellyfin": "host1", "sonarr": "host2"}
assert strays == {}
assert duplicates == {}
def test_detects_stray_stacks(self, config: Config) -> None:
"""Stacks running on wrong hosts are marked as strays."""
running_on_host = {
"host1": set(),
"host2": {"plex"}, # plex should be on host1
}
discovered, strays, _duplicates = build_discovery_results(config, running_on_host)
assert "plex" not in discovered
assert strays == {"plex": ["host2"]}
def test_detects_duplicates(self, config: Config) -> None:
"""Single-host stacks running on multiple hosts are duplicates."""
running_on_host = {
"host1": {"plex"},
"host2": {"plex"}, # plex running on both hosts
}
discovered, strays, duplicates = build_discovery_results(
config, running_on_host, stacks=["plex"]
)
# plex is correctly running on host1
assert discovered == {"plex": "host1"}
# plex is also a stray on host2
assert strays == {"plex": ["host2"]}
# plex is a duplicate (single-host stack on multiple hosts)
assert duplicates == {"plex": ["host1", "host2"]}
def test_filters_to_requested_stacks(self, config: Config) -> None:
"""Only returns results for requested stacks."""
running_on_host = {
"host1": {"plex", "jellyfin"},
"host2": {"sonarr"},
}
discovered, _strays, _duplicates = build_discovery_results(
config, running_on_host, stacks=["plex"]
)
# Only plex should be in results
assert discovered == {"plex": "host1"}
assert "jellyfin" not in discovered
assert "sonarr" not in discovered

View File

@@ -211,8 +211,8 @@ class TestRefreshCommand:
return_value=existing_state,
),
patch(
"compose_farm.cli.management._discover_stacks",
return_value={"plex": "nas02"}, # plex moved to nas02
"compose_farm.cli.management._discover_stacks_full",
return_value=({"plex": "nas02"}, {}, {}), # plex moved to nas02
),
patch("compose_farm.cli.management._snapshot_stacks"),
patch("compose_farm.cli.management.save_state") as mock_save,
@@ -247,8 +247,12 @@ class TestRefreshCommand:
return_value=existing_state,
),
patch(
"compose_farm.cli.management._discover_stacks",
return_value={"plex": "nas01", "grafana": "nas02"}, # jellyfin not running
"compose_farm.cli.management._discover_stacks_full",
return_value=(
{"plex": "nas01", "grafana": "nas02"},
{},
{},
), # jellyfin not running
),
patch("compose_farm.cli.management._snapshot_stacks"),
patch("compose_farm.cli.management.save_state") as mock_save,
@@ -281,8 +285,8 @@ class TestRefreshCommand:
return_value=existing_state,
),
patch(
"compose_farm.cli.management._discover_stacks",
return_value={"plex": "nas01"}, # only plex running
"compose_farm.cli.management._discover_stacks_full",
return_value=({"plex": "nas01"}, {}, {}), # only plex running
),
patch("compose_farm.cli.management._snapshot_stacks"),
patch("compose_farm.cli.management.save_state") as mock_save,
@@ -315,8 +319,8 @@ class TestRefreshCommand:
return_value=existing_state,
),
patch(
"compose_farm.cli.management._discover_stacks",
return_value={"plex": "nas01"}, # jellyfin not running
"compose_farm.cli.management._discover_stacks_full",
return_value=({"plex": "nas01"}, {}, {}), # jellyfin not running
),
patch("compose_farm.cli.management._snapshot_stacks"),
patch("compose_farm.cli.management.save_state") as mock_save,
@@ -350,8 +354,8 @@ class TestRefreshCommand:
return_value=existing_state,
),
patch(
"compose_farm.cli.management._discover_stacks",
return_value={"plex": "nas02"}, # would change
"compose_farm.cli.management._discover_stacks_full",
return_value=({"plex": "nas02"}, {}, {}), # would change
),
patch("compose_farm.cli.management.save_state") as mock_save,
):

View File

@@ -7,11 +7,58 @@ from typing import TYPE_CHECKING
import pytest
from fastapi import HTTPException
from pydantic import ValidationError
if TYPE_CHECKING:
from compose_farm.config import Config
class TestExtractConfigError:
"""Tests for extract_config_error helper."""
def test_validation_error_with_location(self) -> None:
from compose_farm.config import Config, Host
from compose_farm.web.deps import extract_config_error
# Trigger a validation error with an extra field
with pytest.raises(ValidationError) as exc_info:
Config(
hosts={"server": Host(address="192.168.1.1")},
stacks={"app": "server"},
unknown_field="bad", # type: ignore[call-arg]
)
msg = extract_config_error(exc_info.value)
assert "unknown_field" in msg
assert "Extra inputs are not permitted" in msg
def test_validation_error_nested_location(self) -> None:
from compose_farm.config import Host
from compose_farm.web.deps import extract_config_error
# Trigger a validation error with a nested extra field
with pytest.raises(ValidationError) as exc_info:
Host(address="192.168.1.1", bad_key="value") # type: ignore[call-arg]
msg = extract_config_error(exc_info.value)
assert "bad_key" in msg
assert "Extra inputs are not permitted" in msg
def test_regular_exception(self) -> None:
from compose_farm.web.deps import extract_config_error
exc = ValueError("Something went wrong")
msg = extract_config_error(exc)
assert msg == "Something went wrong"
def test_file_not_found_exception(self) -> None:
from compose_farm.web.deps import extract_config_error
exc = FileNotFoundError("Config file not found")
msg = extract_config_error(exc)
assert msg == "Config file not found"
class TestValidateYaml:
"""Tests for _validate_yaml helper."""