Compare commits

...

3 Commits

Author SHA1 Message Date
Bas Nijholt
2a923e6e81 fix: Include field name in config validation error messages (#131)
Previously, Pydantic validation errors like "Extra inputs are not
permitted" didn't show which field caused the error. Now the error
message includes the field location (e.g., "unknown_key: Extra inputs
are not permitted").
2025-12-22 22:35:19 -08:00
Bas Nijholt
5f2e081298 perf: Batch snapshot collection to 1 SSH call per host (#130)
## Summary

Optimize `cf refresh` SSH calls from O(stacks) to O(hosts):
- Discovery: 1 SSH call per host (unchanged)
- Snapshots: 1 SSH call per host (was 1 per stack)

For 50 stacks across 4 hosts: 54 → 8 SSH calls.

## Changes

**Performance:**
- Use `docker ps` + `docker image inspect` instead of `docker compose images` per stack
- Batch snapshot collection by host in `collect_stacks_entries_on_host()`

**Architecture:**
- Add `build_discovery_results()` to `operations.py` (business logic)
- Keep progress bar wrapper in `cli/management.py` (presentation)
- Remove dead code: `discover_all_stacks_on_all_hosts()`, `collect_all_stacks_entries()`
2025-12-22 22:19:32 -08:00
Bas Nijholt
6fbc7430cb perf: Optimize stray detection to use 1 SSH call per host (#129)
* perf: Optimize stray detection to use 1 SSH call per host

Previously, stray detection checked each stack on each host individually,
resulting in (stacks * hosts) SSH calls. For 50 stacks across 4 hosts,
this meant ~200 parallel SSH connections, causing "Connection lost" errors.

Now queries each host once for all running compose projects using:
  docker ps --format '{{.Label "com.docker.compose.project"}}' | sort -u

This reduces SSH calls from ~200 to just 4 (one per host).

Changes:
- Add get_running_stacks_on_host() in executor.py
- Add discover_all_stacks_on_all_hosts() in operations.py
- Update _discover_stacks_full() to use the batch approach

* Remove unused function and add tests

- Remove discover_stack_on_all_hosts() which is no longer used
- Add tests for get_running_stacks_on_host()
- Add tests for discover_all_stacks_on_all_hosts()
  - Verifies it returns correct StackDiscoveryResult
  - Verifies stray detection works
  - Verifies it makes only 1 call per host (not per stack)
2025-12-22 12:09:59 -08:00
10 changed files with 613 additions and 239 deletions

View File

@@ -21,7 +21,7 @@ repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.9
hooks:
- id: ruff
- id: ruff-check
args: [--fix]
- id: ruff-format

View File

@@ -37,24 +37,23 @@ from compose_farm.console import (
)
from compose_farm.executor import (
CommandResult,
get_running_stacks_on_host,
is_local,
run_command,
)
from compose_farm.logs import (
DEFAULT_LOG_PATH,
SnapshotEntry,
collect_stack_entries,
collect_stacks_entries_on_host,
isoformat,
load_existing_entries,
merge_entries,
write_toml,
)
from compose_farm.operations import (
StackDiscoveryResult,
build_discovery_results,
check_host_compatibility,
check_stack_requirements,
discover_stack_host,
discover_stack_on_all_hosts,
)
from compose_farm.state import get_orphaned_stacks, load_state, save_state
from compose_farm.traefik import generate_traefik_config, render_traefik_config
@@ -62,38 +61,39 @@ from compose_farm.traefik import generate_traefik_config, render_traefik_config
# --- Sync helpers ---
def _discover_stacks(cfg: Config, stacks: list[str] | None = None) -> dict[str, str | list[str]]:
"""Discover running stacks with a progress bar."""
stack_list = stacks if stacks is not None else list(cfg.stacks)
results = run_parallel_with_progress(
"Discovering",
stack_list,
lambda s: discover_stack_host(cfg, s),
)
return {svc: host for svc, host in results if host is not None}
def _snapshot_stacks(
cfg: Config,
stacks: list[str],
discovered: dict[str, str | list[str]],
log_path: Path | None,
) -> Path:
"""Capture image digests with a progress bar."""
"""Capture image digests using batched SSH calls (1 per host).
Args:
cfg: Configuration
discovered: Dict mapping stack -> host(s) where it's running
log_path: Optional path to write the log file
Returns:
Path to the written log file.
"""
effective_log_path = log_path or DEFAULT_LOG_PATH
now_dt = datetime.now(UTC)
now_iso = isoformat(now_dt)
async def collect_stack(stack: str) -> tuple[str, list[SnapshotEntry]]:
try:
return stack, await collect_stack_entries(cfg, stack, now=now_dt)
except RuntimeError:
return stack, []
# Group stacks by host for batched SSH calls
stacks_by_host: dict[str, set[str]] = {}
for stack, hosts in discovered.items():
# Use first host for multi-host stacks (they use the same images)
host = hosts[0] if isinstance(hosts, list) else hosts
stacks_by_host.setdefault(host, set()).add(stack)
results = run_parallel_with_progress(
"Capturing",
stacks,
collect_stack,
)
# Collect entries with 1 SSH call per host (with progress bar)
async def collect_on_host(host: str) -> tuple[str, list[SnapshotEntry]]:
entries = await collect_stacks_entries_on_host(cfg, host, stacks_by_host[host], now=now_dt)
return host, entries
results = run_parallel_with_progress("Capturing", list(stacks_by_host.keys()), collect_on_host)
snapshot_entries = [entry for _, entries in results for entry in entries]
if not snapshot_entries:
@@ -155,39 +155,20 @@ def _discover_stacks_full(
) -> tuple[dict[str, str | list[str]], dict[str, list[str]], dict[str, list[str]]]:
"""Discover running stacks with full host scanning for stray detection.
Returns:
Tuple of (discovered, strays, duplicates):
- discovered: stack -> host(s) where running correctly
- strays: stack -> list of unauthorized hosts
- duplicates: stack -> list of all hosts (for single-host stacks on multiple)
Queries each host once for all running stacks (with progress bar),
then delegates to build_discovery_results for categorization.
"""
stack_list = stacks if stacks is not None else list(cfg.stacks)
results: list[StackDiscoveryResult] = run_parallel_with_progress(
"Discovering",
stack_list,
lambda s: discover_stack_on_all_hosts(cfg, s),
)
all_hosts = list(cfg.hosts.keys())
discovered: dict[str, str | list[str]] = {}
strays: dict[str, list[str]] = {}
duplicates: dict[str, list[str]] = {}
# Query each host for running stacks (with progress bar)
async def get_stacks_on_host(host: str) -> tuple[str, set[str]]:
running = await get_running_stacks_on_host(cfg, host)
return host, running
for result in results:
correct_hosts = [h for h in result.running_hosts if h in result.configured_hosts]
if correct_hosts:
if result.is_multi_host:
discovered[result.stack] = correct_hosts
else:
discovered[result.stack] = correct_hosts[0]
host_results = run_parallel_with_progress("Discovering", all_hosts, get_stacks_on_host)
running_on_host: dict[str, set[str]] = dict(host_results)
if result.is_stray:
strays[result.stack] = result.stray_hosts
if result.is_duplicate:
duplicates[result.stack] = result.running_hosts
return discovered, strays, duplicates
return build_discovery_results(cfg, running_on_host, stacks)
def _report_stray_stacks(
@@ -554,10 +535,10 @@ def refresh(
save_state(cfg, new_state)
print_success(f"State updated: {len(new_state)} stacks tracked.")
# Capture image digests for running stacks
# Capture image digests for running stacks (1 SSH call per host)
if discovered:
try:
path = _snapshot_stacks(cfg, list(discovered.keys()), log_path)
path = _snapshot_stacks(cfg, discovered, log_path)
print_success(f"Digests written to {path}")
except RuntimeError as exc:
print_warning(str(exc))

View File

@@ -497,6 +497,28 @@ async def check_stack_running(
return result.success and bool(result.stdout.strip())
async def get_running_stacks_on_host(
config: Config,
host_name: str,
) -> set[str]:
"""Get all running compose stacks on a host in a single SSH call.
Uses docker ps with the compose.project label to identify running stacks.
Much more efficient than checking each stack individually.
"""
host = config.hosts[host_name]
# Get unique project names from running containers
command = "docker ps --format '{{.Label \"com.docker.compose.project\"}}' | sort -u"
result = await run_command(host, command, stack=host_name, stream=False, prefix="")
if not result.success:
return set()
# Filter out empty lines and return as set
return {line.strip() for line in result.stdout.splitlines() if line.strip()}
async def _batch_check_existence(
config: Config,
host_name: str,

View File

@@ -6,21 +6,22 @@ import json
import tomllib
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING
from .executor import run_compose
from .executor import run_command
from .paths import xdg_config_home
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable
from collections.abc import Iterable
from pathlib import Path
from .config import Config
from .executor import CommandResult
# Separator used to split output sections
_SECTION_SEPARATOR = "---CF-SEP---"
DEFAULT_LOG_PATH = xdg_config_home() / "compose-farm" / "dockerfarm-log.toml"
_DIGEST_HEX_LENGTH = 64
@dataclass(frozen=True)
@@ -56,87 +57,97 @@ def _escape(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def _parse_images_output(raw: str) -> list[dict[str, Any]]:
"""Parse `docker compose images --format json` output.
Handles both a JSON array and newline-separated JSON objects for robustness.
"""
raw = raw.strip()
if not raw:
return []
def _parse_image_digests(image_json: str) -> dict[str, str]:
"""Parse docker image inspect JSON to build image tag -> digest map."""
if not image_json:
return {}
try:
parsed = json.loads(raw)
image_data = json.loads(image_json)
except json.JSONDecodeError:
objects = []
for line in raw.splitlines():
if not line.strip():
continue
objects.append(json.loads(line))
return objects
return {}
if isinstance(parsed, list):
return parsed
if isinstance(parsed, dict):
return [parsed]
return []
image_digests: dict[str, str] = {}
for img in image_data:
tags = img.get("RepoTags") or []
digests = img.get("RepoDigests") or []
digest = digests[0].split("@")[-1] if digests else img.get("Id", "")
for tag in tags:
image_digests[tag] = digest
if img.get("Id"):
image_digests[img["Id"]] = digest
return image_digests
def _extract_image_fields(record: dict[str, Any]) -> tuple[str, str]:
"""Extract image name and digest with fallbacks."""
image = record.get("Image") or record.get("Repository") or record.get("Name") or ""
tag = record.get("Tag") or record.get("Version")
if tag and ":" not in image.rsplit("/", 1)[-1]:
image = f"{image}:{tag}"
digest = (
record.get("Digest")
or record.get("Image ID")
or record.get("ImageID")
or record.get("ID")
or ""
)
if digest and not digest.startswith("sha256:") and len(digest) == _DIGEST_HEX_LENGTH:
digest = f"sha256:{digest}"
return image, digest
async def collect_stack_entries(
async def collect_stacks_entries_on_host(
config: Config,
stack: str,
host_name: str,
stacks: set[str],
*,
now: datetime,
run_compose_fn: Callable[..., Awaitable[CommandResult]] = run_compose,
) -> list[SnapshotEntry]:
"""Run `docker compose images` for a stack and normalize results."""
result = await run_compose_fn(config, stack, "images --format json", stream=False)
"""Collect image entries for stacks on one host using 2 docker commands.
Uses `docker ps` to get running containers + their compose project labels,
then `docker image inspect` to get digests for all unique images.
Much faster than running N `docker compose images` commands.
"""
if not stacks:
return []
host = config.hosts[host_name]
# Single SSH call with 2 docker commands:
# 1. Get project|image pairs from running containers
# 2. Get image info (including digests) for all unique images
command = (
f"docker ps --format '{{{{.Label \"com.docker.compose.project\"}}}}|{{{{.Image}}}}' && "
f"echo '{_SECTION_SEPARATOR}' && "
"docker image inspect $(docker ps --format '{{.Image}}' | sort -u) 2>/dev/null || true"
)
result = await run_command(host, command, host_name, stream=False, prefix="")
if not result.success:
msg = result.stderr or f"compose images exited with {result.exit_code}"
error = f"[{stack}] Unable to read images: {msg}"
raise RuntimeError(error)
return []
records = _parse_images_output(result.stdout)
# Use first host for snapshots (multi-host stacks use same images on all hosts)
host_name = config.get_hosts(stack)[0]
compose_path = config.get_compose_path(stack)
# Split output into two sections
parts = result.stdout.split(_SECTION_SEPARATOR)
if len(parts) != 2: # noqa: PLR2004
return []
entries: list[SnapshotEntry] = []
for record in records:
image, digest = _extract_image_fields(record)
if not digest:
container_lines, image_json = parts[0].strip(), parts[1].strip()
# Parse project|image pairs, filtering to only stacks we care about
stack_images: dict[str, set[str]] = {}
for line in container_lines.splitlines():
if "|" not in line:
continue
entries.append(
SnapshotEntry(
stack=stack,
host=host_name,
compose_file=compose_path,
image=image,
digest=digest,
captured_at=now,
)
)
project, image = line.split("|", 1)
if project in stacks:
stack_images.setdefault(project, set()).add(image)
if not stack_images:
return []
# Parse image inspect JSON to build image -> digest map
image_digests = _parse_image_digests(image_json)
# Build entries
entries: list[SnapshotEntry] = []
for stack, images in stack_images.items():
for image in images:
digest = image_digests.get(image, "")
if digest:
entries.append(
SnapshotEntry(
stack=stack,
host=host_name,
compose_file=config.get_compose_path(stack),
image=image,
digest=digest,
captured_at=now,
)
)
return entries

View File

@@ -76,31 +76,6 @@ def get_stack_paths(cfg: Config, stack: str) -> list[str]:
return paths
async def discover_stack_host(cfg: Config, stack: str) -> tuple[str, str | list[str] | None]:
"""Discover where a stack is running.
For multi-host stacks, checks all assigned hosts in parallel.
For single-host, checks assigned host first, then others.
Returns (stack_name, host_or_hosts_or_none).
"""
assigned_hosts = cfg.get_hosts(stack)
if cfg.is_multi_host(stack):
# Check all assigned hosts in parallel
checks = await asyncio.gather(*[check_stack_running(cfg, stack, h) for h in assigned_hosts])
running = [h for h, is_running in zip(assigned_hosts, checks, strict=True) if is_running]
return stack, running if running else None
# Single-host: check assigned host first, then others
if await check_stack_running(cfg, stack, assigned_hosts[0]):
return stack, assigned_hosts[0]
for host in cfg.hosts:
if host != assigned_hosts[0] and await check_stack_running(cfg, stack, host):
return stack, host
return stack, None
class StackDiscoveryResult(NamedTuple):
"""Result of discovering where a stack is running across all hosts."""
@@ -134,25 +109,6 @@ class StackDiscoveryResult(NamedTuple):
return not self.is_multi_host and len(self.running_hosts) > 1
async def discover_stack_on_all_hosts(cfg: Config, stack: str) -> StackDiscoveryResult:
"""Discover where a stack is running across ALL hosts.
Unlike discover_stack_host(), this checks every host in parallel
to detect strays and duplicates.
"""
configured_hosts = cfg.get_hosts(stack)
all_hosts = list(cfg.hosts.keys())
checks = await asyncio.gather(*[check_stack_running(cfg, stack, h) for h in all_hosts])
running_hosts = [h for h, is_running in zip(all_hosts, checks, strict=True) if is_running]
return StackDiscoveryResult(
stack=stack,
configured_hosts=configured_hosts,
running_hosts=running_hosts,
)
async def check_stack_requirements(
cfg: Config,
stack: str,
@@ -518,3 +474,60 @@ async def stop_stray_stacks(
"""
return await _stop_stacks_on_hosts(cfg, strays, label="stray")
def build_discovery_results(
cfg: Config,
running_on_host: dict[str, set[str]],
stacks: list[str] | None = None,
) -> tuple[dict[str, str | list[str]], dict[str, list[str]], dict[str, list[str]]]:
"""Build discovery results from per-host running stacks.
Takes the raw data of which stacks are running on which hosts and
categorizes them into discovered (running correctly), strays (wrong host),
and duplicates (single-host stack on multiple hosts).
Args:
cfg: Config object.
running_on_host: Dict mapping host -> set of running stack names.
stacks: Optional list of stacks to check. Defaults to all configured stacks.
Returns:
Tuple of (discovered, strays, duplicates):
- discovered: stack -> host(s) where running correctly
- strays: stack -> list of unauthorized hosts
- duplicates: stack -> list of all hosts (for single-host stacks on multiple)
"""
stack_list = stacks if stacks is not None else list(cfg.stacks)
all_hosts = list(running_on_host.keys())
# Build StackDiscoveryResult for each stack
results: list[StackDiscoveryResult] = [
StackDiscoveryResult(
stack=stack,
configured_hosts=cfg.get_hosts(stack),
running_hosts=[h for h in all_hosts if stack in running_on_host[h]],
)
for stack in stack_list
]
discovered: dict[str, str | list[str]] = {}
strays: dict[str, list[str]] = {}
duplicates: dict[str, list[str]] = {}
for result in results:
correct_hosts = [h for h in result.running_hosts if h in result.configured_hosts]
if correct_hosts:
if result.is_multi_host:
discovered[result.stack] = correct_hosts
else:
discovered[result.stack] = correct_hosts[0]
if result.is_stray:
strays[result.stack] = result.stray_hosts
if result.is_duplicate:
duplicates[result.stack] = result.running_hosts
return discovered, strays, duplicates

View File

@@ -38,7 +38,17 @@ def get_templates() -> Jinja2Templates:
def extract_config_error(exc: Exception) -> str:
"""Extract a user-friendly error message from a config exception."""
if isinstance(exc, ValidationError):
return "; ".join(err.get("msg", str(err)) for err in exc.errors())
parts = []
for err in exc.errors():
msg = err.get("msg", str(err))
loc = err.get("loc", ())
if loc:
# Format location as dot-separated path (e.g., "hosts.nas.port")
loc_str = ".".join(str(part) for part in loc)
parts.append(f"{loc_str}: {msg}")
else:
parts.append(msg)
return "; ".join(parts)
return str(exc)

View File

@@ -11,6 +11,7 @@ from compose_farm.executor import (
_run_local_command,
check_networks_exist,
check_paths_exist,
get_running_stacks_on_host,
is_local,
run_command,
run_compose,
@@ -239,3 +240,31 @@ class TestCheckNetworksExist:
result = await check_networks_exist(config, "local", [])
assert result == {}
@linux_only
class TestGetRunningStacksOnHost:
"""Tests for get_running_stacks_on_host function (requires Docker)."""
async def test_returns_set_of_stacks(self, tmp_path: Path) -> None:
"""Function returns a set of stack names."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
stacks={},
)
result = await get_running_stacks_on_host(config, "local")
assert isinstance(result, set)
async def test_filters_empty_lines(self, tmp_path: Path) -> None:
"""Empty project names are filtered out."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
stacks={},
)
# Result should not contain empty strings
result = await get_running_stacks_on_host(config, "local")
assert "" not in result

View File

@@ -10,8 +10,8 @@ import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.logs import (
_parse_images_output,
collect_stack_entries,
_SECTION_SEPARATOR,
collect_stacks_entries_on_host,
isoformat,
load_existing_entries,
merge_entries,
@@ -19,74 +19,252 @@ from compose_farm.logs import (
)
def test_parse_images_output_handles_list_and_lines() -> None:
data = [
{"Service": "svc", "Image": "redis", "Digest": "sha256:abc"},
{"Service": "svc", "Image": "db", "Digest": "sha256:def"},
def _make_mock_output(
project_images: dict[str, list[str]], image_info: list[dict[str, object]]
) -> str:
"""Build mock output matching the 2-docker-command format."""
# Section 1: project|image pairs from docker ps
ps_lines = [
f"{project}|{image}" for project, images in project_images.items() for image in images
]
as_array = _parse_images_output(json.dumps(data))
assert len(as_array) == 2
as_lines = _parse_images_output("\n".join(json.dumps(item) for item in data))
assert len(as_lines) == 2
# Section 2: JSON array from docker image inspect
image_json = json.dumps(image_info)
return f"{chr(10).join(ps_lines)}\n{_SECTION_SEPARATOR}\n{image_json}"
@pytest.mark.asyncio
async def test_snapshot_preserves_first_seen(tmp_path: Path) -> None:
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
stack_dir = compose_dir / "svc"
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
class TestCollectStacksEntriesOnHost:
"""Tests for collect_stacks_entries_on_host (2 docker commands per host)."""
config = Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
stacks={"svc": "local"},
)
@pytest.fixture
def config_with_stacks(self, tmp_path: Path) -> Config:
"""Create a config with multiple stacks."""
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
for stack in ["plex", "jellyfin", "sonarr"]:
stack_dir = compose_dir / stack
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
sample_output = json.dumps([{"Service": "svc", "Image": "redis", "Digest": "sha256:abc"}])
async def fake_run_compose(
_cfg: Config, stack: str, compose_cmd: str, *, stream: bool = True
) -> CommandResult:
assert compose_cmd == "images --format json"
assert stream is False or stream is True
return CommandResult(
stack=stack,
exit_code=0,
success=True,
stdout=sample_output,
stderr="",
return Config(
compose_dir=compose_dir,
hosts={"host1": Host(address="localhost"), "host2": Host(address="localhost")},
stacks={"plex": "host1", "jellyfin": "host1", "sonarr": "host2"},
)
log_path = tmp_path / "dockerfarm-log.toml"
@pytest.mark.asyncio
async def test_single_ssh_call(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Verify only 1 SSH call is made regardless of stack count."""
call_count = {"count": 0}
# First snapshot
first_time = datetime(2025, 1, 1, tzinfo=UTC)
first_entries = await collect_stack_entries(
config, "svc", now=first_time, run_compose_fn=fake_run_compose
)
first_iso = isoformat(first_time)
merged = merge_entries([], first_entries, now_iso=first_iso)
meta = {"generated_at": first_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
call_count["count"] += 1
output = _make_mock_output(
{"plex": ["plex:latest"], "jellyfin": ["jellyfin:latest"]},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["plex@sha256:aaa"],
},
{
"RepoTags": ["jellyfin:latest"],
"Id": "sha256:bbb",
"RepoDigests": ["jellyfin@sha256:bbb"],
},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
after_first = tomllib.loads(log_path.read_text())
first_seen = after_first["entries"][0]["first_seen"]
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
# Second snapshot
second_time = datetime(2025, 2, 1, tzinfo=UTC)
second_entries = await collect_stack_entries(
config, "svc", now=second_time, run_compose_fn=fake_run_compose
)
second_iso = isoformat(second_time)
existing = load_existing_entries(log_path)
merged = merge_entries(existing, second_entries, now_iso=second_iso)
meta = {"generated_at": second_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex", "jellyfin"}, now=now
)
after_second = tomllib.loads(log_path.read_text())
entry = after_second["entries"][0]
assert entry["first_seen"] == first_seen
assert entry["last_seen"].startswith("2025-02-01")
assert call_count["count"] == 1
assert len(entries) == 2
@pytest.mark.asyncio
async def test_filters_to_requested_stacks(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Only return entries for stacks we asked for, even if others are running."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
# Docker ps shows 3 stacks, but we only want plex
output = _make_mock_output(
{
"plex": ["plex:latest"],
"jellyfin": ["jellyfin:latest"],
"other": ["other:latest"],
},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["plex@sha256:aaa"],
},
{
"RepoTags": ["jellyfin:latest"],
"Id": "sha256:bbb",
"RepoDigests": ["j@sha256:bbb"],
},
{
"RepoTags": ["other:latest"],
"Id": "sha256:ccc",
"RepoDigests": ["o@sha256:ccc"],
},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert len(entries) == 1
assert entries[0].stack == "plex"
@pytest.mark.asyncio
async def test_multiple_images_per_stack(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Stack with multiple containers/images returns multiple entries."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
output = _make_mock_output(
{"plex": ["plex:latest", "redis:7"]},
[
{
"RepoTags": ["plex:latest"],
"Id": "sha256:aaa",
"RepoDigests": ["p@sha256:aaa"],
},
{"RepoTags": ["redis:7"], "Id": "sha256:bbb", "RepoDigests": ["r@sha256:bbb"]},
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert len(entries) == 2
images = {e.image for e in entries}
assert images == {"plex:latest", "redis:7"}
@pytest.mark.asyncio
async def test_empty_stacks_returns_empty(self, config_with_stacks: Config) -> None:
"""Empty stack set returns empty entries without making SSH call."""
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(config_with_stacks, "host1", set(), now=now)
assert entries == []
@pytest.mark.asyncio
async def test_ssh_failure_returns_empty(
self, config_with_stacks: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""SSH failure returns empty list instead of raising."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
return CommandResult(stack=stack, exit_code=1, success=False, stdout="", stderr="error")
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
now = datetime(2025, 1, 1, tzinfo=UTC)
entries = await collect_stacks_entries_on_host(
config_with_stacks, "host1", {"plex"}, now=now
)
assert entries == []
class TestSnapshotMerging:
"""Tests for merge_entries preserving first_seen."""
@pytest.fixture
def config(self, tmp_path: Path) -> Config:
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
stack_dir = compose_dir / "svc"
stack_dir.mkdir()
(stack_dir / "docker-compose.yml").write_text("services: {}\n")
return Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
stacks={"svc": "local"},
)
@pytest.mark.asyncio
async def test_preserves_first_seen(
self, tmp_path: Path, config: Config, monkeypatch: pytest.MonkeyPatch
) -> None:
"""Repeated snapshots preserve first_seen timestamp."""
async def mock_run_command(
host: Host, command: str, stack: str, *, stream: bool, prefix: str
) -> CommandResult:
output = _make_mock_output(
{"svc": ["redis:latest"]},
[
{
"RepoTags": ["redis:latest"],
"Id": "sha256:abc",
"RepoDigests": ["r@sha256:abc"],
}
],
)
return CommandResult(stack=stack, exit_code=0, success=True, stdout=output)
monkeypatch.setattr("compose_farm.logs.run_command", mock_run_command)
log_path = tmp_path / "dockerfarm-log.toml"
# First snapshot
first_time = datetime(2025, 1, 1, tzinfo=UTC)
first_entries = await collect_stacks_entries_on_host(
config, "local", {"svc"}, now=first_time
)
first_iso = isoformat(first_time)
merged = merge_entries([], first_entries, now_iso=first_iso)
meta = {"generated_at": first_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
after_first = tomllib.loads(log_path.read_text())
first_seen = after_first["entries"][0]["first_seen"]
# Second snapshot
second_time = datetime(2025, 2, 1, tzinfo=UTC)
second_entries = await collect_stacks_entries_on_host(
config, "local", {"svc"}, now=second_time
)
second_iso = isoformat(second_time)
existing = load_existing_entries(log_path)
merged = merge_entries(existing, second_entries, now_iso=second_iso)
meta = {"generated_at": second_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
after_second = tomllib.loads(log_path.read_text())
entry = after_second["entries"][0]
assert entry["first_seen"] == first_seen
assert entry["last_seen"].startswith("2025-02-01")

View File

@@ -11,7 +11,10 @@ import pytest
from compose_farm.cli import lifecycle
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.operations import _migrate_stack
from compose_farm.operations import (
_migrate_stack,
build_discovery_results,
)
@pytest.fixture
@@ -109,3 +112,83 @@ class TestUpdateCommandSequence:
# Verify the sequence is pull, build, down, up
assert "down" in source
assert "up -d" in source
class TestBuildDiscoveryResults:
"""Tests for build_discovery_results function."""
@pytest.fixture
def config(self, tmp_path: Path) -> Config:
"""Create a test config with multiple stacks."""
compose_dir = tmp_path / "compose"
for stack in ["plex", "jellyfin", "sonarr"]:
(compose_dir / stack).mkdir(parents=True)
(compose_dir / stack / "docker-compose.yml").write_text("services: {}")
return Config(
compose_dir=compose_dir,
hosts={
"host1": Host(address="localhost"),
"host2": Host(address="localhost"),
},
stacks={"plex": "host1", "jellyfin": "host1", "sonarr": "host2"},
)
def test_discovers_correctly_running_stacks(self, config: Config) -> None:
"""Stacks running on correct hosts are discovered."""
running_on_host = {
"host1": {"plex", "jellyfin"},
"host2": {"sonarr"},
}
discovered, strays, duplicates = build_discovery_results(config, running_on_host)
assert discovered == {"plex": "host1", "jellyfin": "host1", "sonarr": "host2"}
assert strays == {}
assert duplicates == {}
def test_detects_stray_stacks(self, config: Config) -> None:
"""Stacks running on wrong hosts are marked as strays."""
running_on_host = {
"host1": set(),
"host2": {"plex"}, # plex should be on host1
}
discovered, strays, _duplicates = build_discovery_results(config, running_on_host)
assert "plex" not in discovered
assert strays == {"plex": ["host2"]}
def test_detects_duplicates(self, config: Config) -> None:
"""Single-host stacks running on multiple hosts are duplicates."""
running_on_host = {
"host1": {"plex"},
"host2": {"plex"}, # plex running on both hosts
}
discovered, strays, duplicates = build_discovery_results(
config, running_on_host, stacks=["plex"]
)
# plex is correctly running on host1
assert discovered == {"plex": "host1"}
# plex is also a stray on host2
assert strays == {"plex": ["host2"]}
# plex is a duplicate (single-host stack on multiple hosts)
assert duplicates == {"plex": ["host1", "host2"]}
def test_filters_to_requested_stacks(self, config: Config) -> None:
"""Only returns results for requested stacks."""
running_on_host = {
"host1": {"plex", "jellyfin"},
"host2": {"sonarr"},
}
discovered, _strays, _duplicates = build_discovery_results(
config, running_on_host, stacks=["plex"]
)
# Only plex should be in results
assert discovered == {"plex": "host1"}
assert "jellyfin" not in discovered
assert "sonarr" not in discovered

View File

@@ -7,11 +7,58 @@ from typing import TYPE_CHECKING
import pytest
from fastapi import HTTPException
from pydantic import ValidationError
if TYPE_CHECKING:
from compose_farm.config import Config
class TestExtractConfigError:
"""Tests for extract_config_error helper."""
def test_validation_error_with_location(self) -> None:
from compose_farm.config import Config, Host
from compose_farm.web.deps import extract_config_error
# Trigger a validation error with an extra field
with pytest.raises(ValidationError) as exc_info:
Config(
hosts={"server": Host(address="192.168.1.1")},
stacks={"app": "server"},
unknown_field="bad", # type: ignore[call-arg]
)
msg = extract_config_error(exc_info.value)
assert "unknown_field" in msg
assert "Extra inputs are not permitted" in msg
def test_validation_error_nested_location(self) -> None:
from compose_farm.config import Host
from compose_farm.web.deps import extract_config_error
# Trigger a validation error with a nested extra field
with pytest.raises(ValidationError) as exc_info:
Host(address="192.168.1.1", bad_key="value") # type: ignore[call-arg]
msg = extract_config_error(exc_info.value)
assert "bad_key" in msg
assert "Extra inputs are not permitted" in msg
def test_regular_exception(self) -> None:
from compose_farm.web.deps import extract_config_error
exc = ValueError("Something went wrong")
msg = extract_config_error(exc)
assert msg == "Something went wrong"
def test_file_not_found_exception(self) -> None:
from compose_farm.web.deps import extract_config_error
exc = FileNotFoundError("Config file not found")
msg = extract_config_error(exc)
assert msg == "Config file not found"
class TestValidateYaml:
"""Tests for _validate_yaml helper."""