Compare commits

..

29 Commits

Author SHA1 Message Date
Bas Nijholt
d2c6ab72b2 Add CF_CONFIG env var for simpler Docker workflow
Config search order is now:
1. --config CLI option
2. CF_CONFIG environment variable
3. ./compose-farm.yaml
4. ~/.config/compose-farm/compose-farm.yaml

Docker workflow simplified: mount compose_dir once, set CF_CONFIG
to config file within it. No more symlink issues or multiple mounts.
2025-12-16 10:12:55 -08:00
Bas Nijholt
3656584eda Friendly error when config path is a directory
Docker creates empty directories for missing file mounts,
causing confusing IsADirectoryError tracebacks. Now shows
a clear message explaining the likely cause.
2025-12-16 09:49:40 -08:00
Bas Nijholt
8be370098d Use env vars for docker-compose.yml mounts
- CF_CONFIG_DIR: config directory (default: ~/.config/compose-farm)
- CF_COMPOSE_DIR: compose directory (default: /opt/compose)

Mounts preserve paths so compose_dir in config works correctly.
2025-12-16 09:49:34 -08:00
Bas Nijholt
45057cb6df feat: Add docker-compose.yml for easier Docker usage
Example compose file that mounts SSH agent and config.
Users uncomment the compose_dir mount for their setup.
2025-12-16 09:40:18 -08:00
Bas Nijholt
3f24484d60 fix: Fix VERSION expansion in Dockerfile 2025-12-16 09:24:46 -08:00
Bas Nijholt
b6d50a22b4 fix: Wait for PyPI upload before building Docker image
Use workflow_run trigger to wait for "Upload Python Package" workflow
to complete successfully before building the Docker image. This ensures
the version is available on PyPI when uv tries to install it.
2025-12-16 09:21:35 -08:00
Bas Nijholt
8a658210e1 docs: Add Docker installation instructions with SSH agent 2025-12-16 09:16:43 -08:00
Bas Nijholt
583aaaa080 feat: Add Docker image and GitHub workflow
- Dockerfile using ghcr.io/astral-sh/uv:python3.14-alpine
- Installs compose-farm via uv tool install
- Includes openssh-client for remote host connections
- GitHub workflow builds and pushes to ghcr.io on release
- Supports manual workflow dispatch with version input
- Tags: semver (x.y.z, x.y, x) and latest
2025-12-16 09:11:09 -08:00
Bas Nijholt
22ca4f64e8 docs: Add command quick-reference table to Usage section 2025-12-16 08:30:15 -08:00
Bas Nijholt
32e798fcaa chore: Remove obsolete PLAN.md
The traefik-file feature described in this planning document has been
fully implemented. All open questions have been resolved.
2025-12-15 23:27:27 -08:00
Bas Nijholt
ced81c8b50 refactor: Make internal CLI symbols private
Rename module-internal type aliases, TypeVar, and constants with _ prefix:
- _T, _ServicesArg, _AllOption, _ConfigOption, _LogPathOption, _HostOption
- _MISSING_PATH_PREVIEW_LIMIT
- _DEFAULT_NETWORK_NAME, _DEFAULT_NETWORK_SUBNET, _DEFAULT_NETWORK_GATEWAY

These are only used within cli.py and should not be part of the public API.
2025-12-15 20:57:41 -08:00
Bas Nijholt
7ec4b71101 refactor: Remove unnecessary console aliasing in executor
Import console and err_console directly instead of aliasing to
_console and _err_console. Rename inner function variable to
'out' to avoid shadowing the module-level console import.
2025-12-15 20:36:39 -08:00
Bas Nijholt
94aa58d380 refactor: Make internal constants and classes private
Rename module-internal constants and classes with _ prefix:
- compose.py: SINGLE_PART, PUBLISHED_TARGET_PARTS, HOST_PUBLISHED_PARTS, MIN_VOLUME_PARTS
- logs.py: DIGEST_HEX_LENGTH
- traefik.py: LIST_VALUE_KEYS, MIN_ROUTER_PARTS, MIN_SERVICE_LABEL_PARTS,
  TraefikServiceSource, TRAEFIK_CONFIG_HEADER

These items are only used within their respective modules and should
not be part of the public API.
2025-12-15 20:33:48 -08:00
Bas Nijholt
f8d88e6f97 refactor: Remove run_compose_multi_host and rename report_preflight_failures to _report_preflight_failures
Eliminate the public run_compose_multi_host helper, which was a thin wrapper around the internal _run_sequential_commands_multi_host function, and mark the preflight failure reporting function as internal by prefixing it with an underscore.
Updated all internal calls accordingly.
2025-12-15 20:27:02 -08:00
Bas Nijholt
a95f6309b0 Remove dead code and make internal APIs public
Remove functions that were replaced by _with_progress variants in cli.py:
- discover_running_services, check_mounts_on_configured_hosts,
  check_networks_on_configured_hosts, _check_resources from operations.py
- snapshot_services from logs.py
- get_service_hosts from state.py

Make previously private functions public (remove underscore prefix):
- is_local in executor.py
- isoformat, collect_service_entries, load_existing_entries,
  merge_entries, write_toml in logs.py
- load_env, interpolate, parse_ports in compose.py

Update tests to use renamed public functions.
2025-12-15 20:19:28 -08:00
Bas Nijholt
502de018af docs: Add high availability row to comparison table 2025-12-15 19:51:57 -08:00
Bas Nijholt
a3e8daad33 docs: refine comparison table in README 2025-12-15 16:06:17 -08:00
Bas Nijholt
78a2f65c94 docs: Move comparison link after declarative setup line 2025-12-15 15:48:15 -08:00
Bas Nijholt
1689a6833a docs: Link to comparison section from Why Compose Farm 2025-12-15 15:46:26 -08:00
Bas Nijholt
6d2f32eadf docs: Add feature comparison table with emojis 2025-12-15 15:44:16 -08:00
Bas Nijholt
c549dd50c9 docs: Move comparison section to end, simplify format 2025-12-15 15:41:09 -08:00
Bas Nijholt
82312e9421 docs: add comparison with alternatives to README 2025-12-15 15:37:08 -08:00
Bas Nijholt
e13b367188 docs: Add shields to README 2025-12-15 15:31:30 -08:00
Bas Nijholt
d73049cc1b docs: Add declarative philosophy to Why Compose Farm 2025-12-15 15:17:04 -08:00
Bas Nijholt
4373b23cd3 docs: Simplify xkcd explanation, lead with simplicity 2025-12-15 14:54:29 -08:00
Bas Nijholt
73eb6ccf41 docs: Center xkcd image 2025-12-15 14:52:57 -08:00
Bas Nijholt
6ca48d0d56 docs: Add console.py to CLAUDE.md architecture 2025-12-15 14:52:40 -08:00
Bas Nijholt
b82599005e docs: Add xkcd reference and clarify this is not a new standard 2025-12-15 14:37:33 -08:00
Bas Nijholt
b044053674 docs: Emphasize zero changes required to compose files 2025-12-15 14:19:52 -08:00
17 changed files with 372 additions and 392 deletions

76
.github/workflows/docker.yml vendored Normal file
View File

@@ -0,0 +1,76 @@
name: Build and Push Docker Image
on:
workflow_run:
workflows: ["Upload Python Package"]
types: [completed]
workflow_dispatch:
inputs:
version:
description: 'Version to build (leave empty for latest)'
required: false
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
build-and-push:
runs-on: ubuntu-latest
# Only run if PyPI upload succeeded (or manual dispatch)
if: ${{ github.event_name == 'workflow_dispatch' || github.event.workflow_run.conclusion == 'success' }}
permissions:
contents: read
packages: write
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract version
id: version
run: |
if [ "${{ github.event_name }}" = "workflow_run" ]; then
# Get version from the tag that triggered the release
VERSION="${{ github.event.workflow_run.head_branch }}"
# Strip 'v' prefix if present
VERSION="${VERSION#v}"
elif [ -n "${{ github.event.inputs.version }}" ]; then
VERSION="${{ github.event.inputs.version }}"
else
VERSION=""
fi
echo "version=$VERSION" >> $GITHUB_OUTPUT
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=semver,pattern={{version}},value=v${{ steps.version.outputs.version }}
type=semver,pattern={{major}}.{{minor}},value=v${{ steps.version.outputs.version }}
type=semver,pattern={{major}},value=v${{ steps.version.outputs.version }}
type=raw,value=latest
- name: Build and push
uses: docker/build-push-action@v6
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
build-args: |
VERSION=${{ steps.version.outputs.version }}
cache-from: type=gha
cache-to: type=gha,mode=max

View File

@@ -13,6 +13,7 @@ compose_farm/
├── cli.py # Typer commands (thin layer, delegates to operations)
├── config.py # Pydantic models, YAML loading
├── compose.py # Compose file parsing (.env, ports, volumes, networks)
├── console.py # Shared Rich console instances
├── executor.py # SSH/local command execution, streaming output
├── operations.py # Business logic (up, migrate, discover, preflight checks)
├── state.py # Deployment state tracking (which service on which host)

16
Dockerfile Normal file
View File

@@ -0,0 +1,16 @@
# syntax=docker/dockerfile:1
FROM ghcr.io/astral-sh/uv:python3.14-alpine
# Install SSH client (required for remote host connections)
RUN apk add --no-cache openssh-client
# Install compose-farm from PyPI
ARG VERSION
RUN uv tool install compose-farm${VERSION:+==$VERSION}
# Add uv tool bin to PATH
ENV PATH="/root/.local/bin:$PATH"
# Default entrypoint
ENTRYPOINT ["cf"]
CMD ["--help"]

35
PLAN.md
View File

@@ -1,35 +0,0 @@
# Compose Farm Traefik Multihost Ingress Plan
## Goal
Generate a Traefik file-provider fragment from existing docker-compose Traefik labels (no config duplication) so a single front-door Traefik on 192.168.1.66 with wildcard `*.lab.mydomain.org` can route to services running on other hosts. Keep the current simplicity (SSH + docker compose); no Swarm/K8s.
## Requirements
- Traefik stays on main host; keep current `dynamic.yml` and Docker provider for local containers.
- Add a watched directory provider (any path works) and load a generated fragment (e.g., `compose-farm.generated.yml`).
- No edits to compose files: reuse existing `traefik.*` labels as the single source of truth; Compose Farm only reads them.
- Generator infers routing from labels and reachability from `ports:` mappings; prefer host-published ports so Traefik can reach services across hosts. Upstreams point to `<host address>:<published host port>`; warn if no published port is found.
- Only minimal data in `compose-farm.yaml`: hosts map and service→host mapping (already present).
- No new orchestration/discovery layers; respect KISS/YAGNI/DRY.
## Non-Goals
- No Swarm/Kubernetes adoption.
- No global Docker provider across hosts.
- No health checks/service discovery layer.
## Current State (Dec 2025)
- Compose Farm: Typer CLI wrapping `docker compose` over SSH; config in `compose-farm.yaml`; parallel by default; snapshot/log tooling present.
- Traefik: single instance on 192.168.1.66, wildcard `*.lab.mydomain.org`, Docker provider for local services, file provider via `dynamic.yml` already in use.
## Proposed Implementation Steps
1) Add generator command: `compose-farm traefik-file --output <path>`.
2) Resolve per-service host from `compose-farm.yaml`; read compose file at `{compose_dir}/{service}/docker-compose.yml`.
3) Parse `traefik.*` labels to build routers/services/middlewares as in compose; map container port to published host port (from `ports:`) to form upstream URLs with host address.
4) Emit file-provider YAML to the watched directory (recommended default: `/mnt/data/traefik/dynamic.d/compose-farm.generated.yml`, but user chooses via `--output`).
5) Warnings: if no published port is found, warn that cross-host reachability requires L3 reachability to container IPs.
6) Tests: label parsing, port mapping, YAML render; scenario with published port; scenario without published port.
7) Docs: update README/CLAUDE to describe directory provider flags and the generator workflow; note that compose files remain unchanged.
## Open Questions
- How to derive target host address: use `hosts.<name>.address` verbatim, or allow override per service? (Default: use host address.)
- Should we support multiple hosts/backends per service for LB/HA? (Start with single server.)
- Where to store generated file by default? (Default to user-specified `--output`; maybe fallback to `./compose-farm-traefik.yml`.)

View File

@@ -1,5 +1,10 @@
# Compose Farm
[![PyPI](https://img.shields.io/pypi/v/compose-farm)](https://pypi.org/project/compose-farm/)
[![Python](https://img.shields.io/pypi/pyversions/compose-farm)](https://pypi.org/project/compose-farm/)
[![License](https://img.shields.io/github/license/basnijholt/compose-farm)](LICENSE)
[![GitHub stars](https://img.shields.io/github/stars/basnijholt/compose-farm)](https://github.com/basnijholt/compose-farm/stargazers)
<img src="http://files.nijho.lt/compose-farm.png" align="right" style="width: 300px;" />
A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
@@ -23,18 +28,37 @@ A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
- [Usage](#usage)
- [Auto-Migration](#auto-migration)
- [Traefik Multihost Ingress (File Provider)](#traefik-multihost-ingress-file-provider)
- [Comparison with Alternatives](#comparison-with-alternatives)
- [License](#license)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
## Why Compose Farm?
I run 100+ Docker Compose stacks on an LXC container that frequently runs out of memory. I needed a way to distribute services across multiple machines without the complexity of:
I used to run 100+ Docker Compose stacks on a single machine that kept running out of memory. I needed a way to distribute services across multiple machines without the complexity of:
- **Kubernetes**: Overkill for my use case. I don't need pods, services, ingress controllers, or YAML manifests 10x the size of my compose files.
- **Docker Swarm**: Effectively in maintenance mode—no longer being invested in by Docker.
**Compose Farm is intentionally simple**: one YAML config mapping services to hosts, and a CLI that runs `docker compose` commands over SSH. That's it.
Both require changes to your compose files. **Compose Farm requires zero changes**—your existing `docker-compose.yml` files work as-is.
I also wanted a declarative setup—one config file that defines where everything runs. Change the config, run `up`, and services migrate automatically. See [Comparison with Alternatives](#comparison-with-alternatives) for how this compares to other approaches.
<p align="center">
<a href="https://xkcd.com/927/">
<img src="https://imgs.xkcd.com/comics/standards.png" alt="xkcd: Standards" width="400" />
</a>
</p>
Before you say it—no, this is not a new standard. I changed nothing about my existing setup. When I added more hosts, I just mounted my drives at the same paths, and everything worked. You can do all of this manually today—SSH into a host and run `docker compose up`.
Compose Farm just automates what you'd do by hand:
- Runs `docker compose` commands over SSH
- Tracks which service runs on which host
- Auto-migrates services when you change the host assignment
- Generates Traefik file-provider config for cross-host routing
**It's a convenience wrapper, not a new paradigm.**
## How It Works
@@ -108,6 +132,23 @@ uv tool install compose-farm
pip install compose-farm
```
<details><summary>🐳 Docker</summary>
Using the provided `docker-compose.yml`:
```bash
docker compose run --rm cf up --all
```
Or directly:
```bash
docker run --rm \
-v $SSH_AUTH_SOCK:/ssh-agent -e SSH_AUTH_SOCK=/ssh-agent \
-v ./compose-farm.yaml:/root/.config/compose-farm/compose-farm.yaml:ro \
ghcr.io/basnijholt/compose-farm up --all
```
</details>
## Configuration
Create `~/.config/compose-farm/compose-farm.yaml` (or `./compose-farm.yaml` in your working directory):
@@ -170,6 +211,22 @@ When you run `cf up autokuma`, it starts the service on all hosts in parallel. M
The CLI is available as both `compose-farm` and the shorter `cf` alias.
| Command | Description |
|---------|-------------|
| `cf up <svc>` | Start service (auto-migrates if host changed) |
| `cf down <svc>` | Stop service |
| `cf restart <svc>` | down + up |
| `cf update <svc>` | pull + down + up |
| `cf pull <svc>` | Pull latest images |
| `cf logs -f <svc>` | Follow logs |
| `cf ps` | Show status of all services |
| `cf sync` | Discover running services + capture image digests |
| `cf check` | Validate config, mounts, networks |
| `cf init-network` | Create Docker network on hosts |
| `cf traefik-file` | Generate Traefik file-provider config |
All commands support `--all` to operate on all services.
```bash
# Start services (auto-migrates if host changed in config)
cf up plex jellyfin
@@ -329,6 +386,31 @@ Update your Traefik config to use directory watching instead of a single file:
- --providers.file.watch=true
```
## Comparison with Alternatives
There are many ways to run containers on multiple hosts. Here is where Compose Farm sits:
| | Docker Contexts | K8s / Swarm | Ansible / Terraform | Portainer / Coolify | Compose Farm |
|---|:---:|:---:|:---:|:---:|:---:|
| No compose rewrites | ✅ | ❌ | ✅ | ✅ | ✅ |
| Version controlled | ✅ | ✅ | ✅ | ❌ | ✅ |
| State tracking | ❌ | ✅ | ✅ | ✅ | ✅ |
| Auto-migration | ❌ | ✅ | ❌ | ❌ | ✅ |
| Interactive CLI | ❌ | ❌ | ❌ | ❌ | ✅ |
| Parallel execution | ❌ | ✅ | ✅ | ✅ | ✅ |
| Agentless | ✅ | ❌ | ✅ | ❌ | ✅ |
| High availability | ❌ | ✅ | ❌ | ❌ | ❌ |
**Docker Contexts** — You can use `docker context create remote ssh://...` and `docker compose --context remote up`. But it's manual: you must remember which host runs which service, there's no global view, no parallel execution, and no auto-migration.
**Kubernetes / Docker Swarm** — Full orchestration that abstracts away the hardware. But they require cluster initialization, separate control planes, and often rewriting compose files. They introduce complexity (consensus, overlay networks) unnecessary for static "pet" servers.
**Ansible / Terraform** — Infrastructure-as-Code tools that can SSH in and deploy containers. But they're push-based configuration management, not interactive CLIs. Great for setting up state, clumsy for day-to-day operations like `cf logs -f` or quickly restarting a service.
**Portainer / Coolify** — Web-based management UIs. But they're UI-first and often require agents on your servers. Compose Farm is CLI-first and agentless.
**Compose Farm is the middle ground:** a robust CLI that productizes the manual SSH pattern. You get the "cluster feel" (unified commands, state tracking) without the "cluster cost" (complexity, agents, control planes).
## License
MIT

11
docker-compose.yml Normal file
View File

@@ -0,0 +1,11 @@
services:
cf:
image: ghcr.io/basnijholt/compose-farm:latest
volumes:
- ${SSH_AUTH_SOCK}:/ssh-agent:ro
# Compose directory (contains compose files AND compose-farm.yaml config)
- ${CF_COMPOSE_DIR:-/opt/compose}:${CF_COMPOSE_DIR:-/opt/compose}
environment:
- SSH_AUTH_SOCK=/ssh-agent
# Config file path (state stored alongside it)
- CF_CONFIG=${CF_COMPOSE_DIR:-/opt/compose}/compose-farm.yaml

View File

@@ -26,10 +26,10 @@ from .config import Config, load_config
from .console import console, err_console
from .executor import (
CommandResult,
_is_local,
check_networks_exist,
check_paths_exist,
check_service_running,
is_local,
run_command,
run_compose_on_host,
run_on_services,
@@ -38,11 +38,11 @@ from .executor import (
from .logs import (
DEFAULT_LOG_PATH,
SnapshotEntry,
_collect_service_entries,
_isoformat,
_load_existing_entries,
_merge_entries,
_write_toml,
collect_service_entries,
isoformat,
load_existing_entries,
merge_entries,
write_toml,
)
from .operations import (
check_host_compatibility,
@@ -62,7 +62,7 @@ from .traefik import generate_traefik_config, render_traefik_config
if TYPE_CHECKING:
from collections.abc import Callable, Coroutine, Generator, Mapping
T = TypeVar("T")
_T = TypeVar("_T")
@contextlib.contextmanager
@@ -170,7 +170,7 @@ def _get_services(
return list(services), config
def _run_async(coro: Coroutine[None, None, T]) -> T:
def _run_async(coro: Coroutine[None, None, _T]) -> _T:
"""Run async coroutine."""
return asyncio.run(coro)
@@ -227,28 +227,28 @@ def _run_host_operation(
_report_results(results)
ServicesArg = Annotated[
_ServicesArg = Annotated[
list[str] | None,
typer.Argument(help="Services to operate on"),
]
AllOption = Annotated[
_AllOption = Annotated[
bool,
typer.Option("--all", "-a", help="Run on all services"),
]
ConfigOption = Annotated[
_ConfigOption = Annotated[
Path | None,
typer.Option("--config", "-c", help="Path to config file"),
]
LogPathOption = Annotated[
_LogPathOption = Annotated[
Path | None,
typer.Option("--log-path", "-l", help="Path to Dockerfarm TOML log"),
]
HostOption = Annotated[
_HostOption = Annotated[
str | None,
typer.Option("--host", "-H", help="Filter to services on this host"),
]
MISSING_PATH_PREVIEW_LIMIT = 2
_MISSING_PATH_PREVIEW_LIMIT = 2
def _validate_host_for_service(cfg: Config, service: str, host: str) -> None:
@@ -267,13 +267,13 @@ def _validate_host_for_service(cfg: Config, service: str, host: str) -> None:
@app.command(rich_help_panel="Lifecycle")
def up(
services: ServicesArg = None,
all_services: AllOption = False,
services: _ServicesArg = None,
all_services: _AllOption = False,
migrate: Annotated[
bool, typer.Option("--migrate", "-m", help="Only services needing migration")
] = False,
host: HostOption = None,
config: ConfigOption = None,
host: _HostOption = None,
config: _ConfigOption = None,
) -> None:
"""Start services (docker compose up -d). Auto-migrates if host changed."""
if migrate and host:
@@ -303,10 +303,10 @@ def up(
@app.command(rich_help_panel="Lifecycle")
def down(
services: ServicesArg = None,
all_services: AllOption = False,
host: HostOption = None,
config: ConfigOption = None,
services: _ServicesArg = None,
all_services: _AllOption = False,
host: _HostOption = None,
config: _ConfigOption = None,
) -> None:
"""Stop services (docker compose down)."""
svc_list, cfg = _get_services(services or [], all_services, config)
@@ -336,9 +336,9 @@ def down(
@app.command(rich_help_panel="Lifecycle")
def pull(
services: ServicesArg = None,
all_services: AllOption = False,
config: ConfigOption = None,
services: _ServicesArg = None,
all_services: _AllOption = False,
config: _ConfigOption = None,
) -> None:
"""Pull latest images (docker compose pull)."""
svc_list, cfg = _get_services(services or [], all_services, config)
@@ -349,9 +349,9 @@ def pull(
@app.command(rich_help_panel="Lifecycle")
def restart(
services: ServicesArg = None,
all_services: AllOption = False,
config: ConfigOption = None,
services: _ServicesArg = None,
all_services: _AllOption = False,
config: _ConfigOption = None,
) -> None:
"""Restart services (down + up)."""
svc_list, cfg = _get_services(services or [], all_services, config)
@@ -363,9 +363,9 @@ def restart(
@app.command(rich_help_panel="Lifecycle")
def update(
services: ServicesArg = None,
all_services: AllOption = False,
config: ConfigOption = None,
services: _ServicesArg = None,
all_services: _AllOption = False,
config: _ConfigOption = None,
) -> None:
"""Update services (pull + down + up)."""
svc_list, cfg = _get_services(services or [], all_services, config)
@@ -379,15 +379,15 @@ def update(
@app.command(rich_help_panel="Monitoring")
def logs(
services: ServicesArg = None,
all_services: AllOption = False,
host: HostOption = None,
services: _ServicesArg = None,
all_services: _AllOption = False,
host: _HostOption = None,
follow: Annotated[bool, typer.Option("--follow", "-f", help="Follow logs")] = False,
tail: Annotated[
int | None,
typer.Option("--tail", "-n", help="Number of lines (default: 20 for --all, 100 otherwise)"),
] = None,
config: ConfigOption = None,
config: _ConfigOption = None,
) -> None:
"""Show service logs."""
if all_services and host is not None:
@@ -421,7 +421,7 @@ def logs(
@app.command(rich_help_panel="Monitoring")
def ps(
config: ConfigOption = None,
config: _ConfigOption = None,
) -> None:
"""Show status of all services."""
cfg = _load_config_or_exit(config)
@@ -459,7 +459,7 @@ def _group_services_by_host(
return by_host
def _get_container_counts_with_progress(cfg: Config) -> dict[str, int]:
def _get_container_counts(cfg: Config) -> dict[str, int]:
"""Get container counts from all hosts with a progress bar."""
async def get_count(host_name: str) -> tuple[str, int]:
@@ -552,7 +552,7 @@ def stats(
bool,
typer.Option("--live", "-l", help="Query Docker for live container stats"),
] = False,
config: ConfigOption = None,
config: _ConfigOption = None,
) -> None:
"""Show overview statistics for hosts and services.
@@ -569,7 +569,7 @@ def stats(
container_counts: dict[str, int] = {}
if live:
container_counts = _get_container_counts_with_progress(cfg)
container_counts = _get_container_counts(cfg)
host_table = _build_host_table(
cfg, services_by_host, running_by_host, container_counts, show_containers=live
@@ -582,8 +582,8 @@ def stats(
@app.command("traefik-file", rich_help_panel="Configuration")
def traefik_file(
services: ServicesArg = None,
all_services: AllOption = False,
services: _ServicesArg = None,
all_services: _AllOption = False,
output: Annotated[
Path | None,
typer.Option(
@@ -592,7 +592,7 @@ def traefik_file(
help="Write Traefik file-provider YAML to this path (stdout if omitted)",
),
] = None,
config: ConfigOption = None,
config: _ConfigOption = None,
) -> None:
"""Generate a Traefik file-provider fragment from compose Traefik labels."""
svc_list, cfg = _get_services(services or [], all_services, config)
@@ -615,7 +615,7 @@ def traefik_file(
err_console.print(f"[yellow]![/] {warning}")
def _discover_services_with_progress(cfg: Config) -> dict[str, str | list[str]]:
def _discover_services(cfg: Config) -> dict[str, str | list[str]]:
"""Discover running services with a progress bar."""
async def check_service(service: str) -> tuple[str, str | list[str] | None]:
@@ -665,7 +665,7 @@ def _discover_services_with_progress(cfg: Config) -> dict[str, str | list[str]]:
return asyncio.run(gather_with_progress(progress, task_id))
def _snapshot_services_with_progress(
def _snapshot_services(
cfg: Config,
services: list[str],
log_path: Path | None,
@@ -674,7 +674,7 @@ def _snapshot_services_with_progress(
async def collect_service(service: str, now: datetime) -> list[SnapshotEntry]:
try:
return await _collect_service_entries(cfg, service, now=now)
return await collect_service_entries(cfg, service, now=now)
except RuntimeError:
return []
@@ -697,7 +697,7 @@ def _snapshot_services_with_progress(
effective_log_path = log_path or DEFAULT_LOG_PATH
now_dt = datetime.now(UTC)
now_iso = _isoformat(now_dt)
now_iso = isoformat(now_dt)
with _progress_bar("Capturing", len(services)) as (progress, task_id):
snapshot_entries = asyncio.run(gather_with_progress(progress, task_id, now_dt, services))
@@ -706,17 +706,17 @@ def _snapshot_services_with_progress(
msg = "No image digests were captured"
raise RuntimeError(msg)
existing_entries = _load_existing_entries(effective_log_path)
merged_entries = _merge_entries(existing_entries, snapshot_entries, now_iso=now_iso)
existing_entries = load_existing_entries(effective_log_path)
merged_entries = merge_entries(existing_entries, snapshot_entries, now_iso=now_iso)
meta = {"generated_at": now_iso, "compose_dir": str(cfg.compose_dir)}
_write_toml(effective_log_path, meta=meta, entries=merged_entries)
write_toml(effective_log_path, meta=meta, entries=merged_entries)
return effective_log_path
def _check_ssh_connectivity(cfg: Config) -> list[str]:
"""Check SSH connectivity to all hosts. Returns list of unreachable hosts."""
# Filter out local hosts - no SSH needed
remote_hosts = [h for h in cfg.hosts if not _is_local(cfg.hosts[h])]
remote_hosts = [h for h in cfg.hosts if not is_local(cfg.hosts[h])]
if not remote_hosts:
return []
@@ -742,7 +742,7 @@ def _check_ssh_connectivity(cfg: Config) -> list[str]:
return asyncio.run(gather_with_progress(progress, task_id))
def _check_mounts_and_networks_with_progress(
def _check_mounts_and_networks(
cfg: Config,
services: list[str],
) -> tuple[list[tuple[str, str, str]], list[tuple[str, str, str]]]:
@@ -837,8 +837,8 @@ def _report_sync_changes(
@app.command(rich_help_panel="Configuration")
def sync(
config: ConfigOption = None,
log_path: LogPathOption = None,
config: _ConfigOption = None,
log_path: _LogPathOption = None,
dry_run: Annotated[
bool,
typer.Option("--dry-run", "-n", help="Show what would be synced without writing"),
@@ -853,7 +853,7 @@ def sync(
cfg = _load_config_or_exit(config)
current_state = load_state(cfg)
discovered = _discover_services_with_progress(cfg)
discovered = _discover_services(cfg)
# Calculate changes
added = [s for s in discovered if s not in current_state]
@@ -883,7 +883,7 @@ def sync(
# Capture image digests for running services
if discovered:
try:
path = _snapshot_services_with_progress(cfg, list(discovered.keys()), log_path)
path = _snapshot_services(cfg, list(discovered.keys()), log_path)
console.print(f"[green]✓[/] Digests written to {path}")
except RuntimeError as exc:
err_console.print(f"[yellow]![/] {exc}")
@@ -999,9 +999,9 @@ def _report_host_compatibility(
if found == total:
console.print(f" [green]✓[/] [magenta]{host_name}[/] {found}/{total}{marker}")
else:
preview = ", ".join(missing[:MISSING_PATH_PREVIEW_LIMIT])
if len(missing) > MISSING_PATH_PREVIEW_LIMIT:
preview += f", +{len(missing) - MISSING_PATH_PREVIEW_LIMIT} more"
preview = ", ".join(missing[:_MISSING_PATH_PREVIEW_LIMIT])
if len(missing) > _MISSING_PATH_PREVIEW_LIMIT:
preview += f", +{len(missing) - _MISSING_PATH_PREVIEW_LIMIT} more"
console.print(
f" [red]✗[/] [magenta]{host_name}[/] {found}/{total} "
f"[dim](missing: {preview})[/]{marker}"
@@ -1022,7 +1022,7 @@ def _run_remote_checks(cfg: Config, svc_list: list[str], *, show_host_compat: bo
console.print() # Spacing before mounts/networks check
# Check mounts and networks
mount_errors, network_errors = _check_mounts_and_networks_with_progress(cfg, svc_list)
mount_errors, network_errors = _check_mounts_and_networks(cfg, svc_list)
if mount_errors:
_report_mount_errors(mount_errors)
@@ -1045,12 +1045,12 @@ def _run_remote_checks(cfg: Config, svc_list: list[str], *, show_host_compat: bo
@app.command(rich_help_panel="Configuration")
def check(
services: ServicesArg = None,
services: _ServicesArg = None,
local: Annotated[
bool,
typer.Option("--local", help="Skip SSH-based checks (faster)"),
] = False,
config: ConfigOption = None,
config: _ConfigOption = None,
) -> None:
"""Validate configuration, traefik labels, mounts, and networks.
@@ -1090,9 +1090,9 @@ def check(
# Default network settings for cross-host Docker networking
DEFAULT_NETWORK_NAME = "mynetwork"
DEFAULT_NETWORK_SUBNET = "172.20.0.0/16"
DEFAULT_NETWORK_GATEWAY = "172.20.0.1"
_DEFAULT_NETWORK_NAME = "mynetwork"
_DEFAULT_NETWORK_SUBNET = "172.20.0.0/16"
_DEFAULT_NETWORK_GATEWAY = "172.20.0.1"
@app.command("init-network", rich_help_panel="Configuration")
@@ -1104,16 +1104,16 @@ def init_network(
network: Annotated[
str,
typer.Option("--network", "-n", help="Network name"),
] = DEFAULT_NETWORK_NAME,
] = _DEFAULT_NETWORK_NAME,
subnet: Annotated[
str,
typer.Option("--subnet", "-s", help="Network subnet"),
] = DEFAULT_NETWORK_SUBNET,
] = _DEFAULT_NETWORK_SUBNET,
gateway: Annotated[
str,
typer.Option("--gateway", "-g", help="Network gateway"),
] = DEFAULT_NETWORK_GATEWAY,
config: ConfigOption = None,
] = _DEFAULT_NETWORK_GATEWAY,
config: _ConfigOption = None,
) -> None:
"""Create Docker network on hosts with consistent settings.

View File

@@ -18,10 +18,10 @@ if TYPE_CHECKING:
from .config import Config
# Port parsing constants
SINGLE_PART = 1
PUBLISHED_TARGET_PARTS = 2
HOST_PUBLISHED_PARTS = 3
MIN_VOLUME_PARTS = 2
_SINGLE_PART = 1
_PUBLISHED_TARGET_PARTS = 2
_HOST_PUBLISHED_PARTS = 3
_MIN_VOLUME_PARTS = 2
_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-(.*?))?\}")
@@ -34,7 +34,7 @@ class PortMapping:
published: int | None
def load_env(compose_path: Path) -> dict[str, str]:
def _load_env(compose_path: Path) -> dict[str, str]:
"""Load environment variables for compose interpolation.
Reads from .env file in the same directory as compose file,
@@ -59,7 +59,7 @@ def load_env(compose_path: Path) -> dict[str, str]:
return env
def interpolate(value: str, env: dict[str, str]) -> str:
def _interpolate(value: str, env: dict[str, str]) -> str:
"""Perform ${VAR} and ${VAR:-default} interpolation."""
def replace(match: re.Match[str]) -> str:
@@ -73,7 +73,7 @@ def interpolate(value: str, env: dict[str, str]) -> str:
return _VAR_PATTERN.sub(replace, value)
def parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PLR0912
def _parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PLR0912
"""Parse port specifications from compose file.
Handles string formats like "8080", "8080:80", "0.0.0.0:8080:80",
@@ -87,18 +87,22 @@ def parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PL
for item in items:
if isinstance(item, str):
interpolated = interpolate(item, env)
interpolated = _interpolate(item, env)
port_spec, _, _ = interpolated.partition("/")
parts = port_spec.split(":")
published: int | None = None
target: int | None = None
if len(parts) == SINGLE_PART and parts[0].isdigit():
if len(parts) == _SINGLE_PART and parts[0].isdigit():
target = int(parts[0])
elif len(parts) == PUBLISHED_TARGET_PARTS and parts[0].isdigit() and parts[1].isdigit():
elif (
len(parts) == _PUBLISHED_TARGET_PARTS and parts[0].isdigit() and parts[1].isdigit()
):
published = int(parts[0])
target = int(parts[1])
elif len(parts) == HOST_PUBLISHED_PARTS and parts[-2].isdigit() and parts[-1].isdigit():
elif (
len(parts) == _HOST_PUBLISHED_PARTS and parts[-2].isdigit() and parts[-1].isdigit()
):
published = int(parts[-2])
target = int(parts[-1])
@@ -107,7 +111,7 @@ def parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PL
elif isinstance(item, dict):
target_raw = item.get("target")
if isinstance(target_raw, str):
target_raw = interpolate(target_raw, env)
target_raw = _interpolate(target_raw, env)
if target_raw is None:
continue
try:
@@ -117,7 +121,7 @@ def parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PL
published_raw = item.get("published")
if isinstance(published_raw, str):
published_raw = interpolate(published_raw, env)
published_raw = _interpolate(published_raw, env)
published_val: int | None
try:
published_val = int(str(published_raw)) if published_raw is not None else None
@@ -144,14 +148,14 @@ def _parse_volume_item(
) -> str | None:
"""Parse a single volume item and return host path if it's a bind mount."""
if isinstance(item, str):
interpolated = interpolate(item, env)
interpolated = _interpolate(item, env)
parts = interpolated.split(":")
if len(parts) >= MIN_VOLUME_PARTS:
if len(parts) >= _MIN_VOLUME_PARTS:
return _resolve_host_path(parts[0], compose_dir)
elif isinstance(item, dict) and item.get("type") == "bind":
source = item.get("source")
if source:
interpolated = interpolate(str(source), env)
interpolated = _interpolate(str(source), env)
return _resolve_host_path(interpolated, compose_dir)
return None
@@ -166,7 +170,7 @@ def parse_host_volumes(config: Config, service: str) -> list[str]:
if not compose_path.exists():
return []
env = load_env(compose_path)
env = _load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
@@ -234,7 +238,7 @@ def load_compose_services(
message = f"[{stack}] Compose file not found: {compose_path}"
raise FileNotFoundError(message)
env = load_env(compose_path)
env = _load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
@@ -248,7 +252,7 @@ def normalize_labels(raw: Any, env: dict[str, str]) -> dict[str, str]:
return {}
if isinstance(raw, dict):
return {
interpolate(str(k), env): interpolate(str(v), env)
_interpolate(str(k), env): _interpolate(str(v), env)
for k, v in raw.items()
if k is not None
}
@@ -258,8 +262,8 @@ def normalize_labels(raw: Any, env: dict[str, str]) -> dict[str, str]:
if not isinstance(item, str) or "=" not in item:
continue
key_raw, value_raw = item.split("=", 1)
key = interpolate(key_raw.strip(), env)
value = interpolate(value_raw.strip(), env)
key = _interpolate(key_raw.strip(), env)
value = _interpolate(value_raw.strip(), env)
labels[key] = value
return labels
return {}
@@ -278,5 +282,5 @@ def get_ports_for_service(
if ref_service in all_services:
ref_def = all_services[ref_service]
if isinstance(ref_def, dict):
return parse_ports(ref_def.get("ports"), env)
return parse_ports(definition.get("ports"), env)
return _parse_ports(ref_def.get("ports"), env)
return _parse_ports(definition.get("ports"), env)

View File

@@ -148,9 +148,10 @@ def load_config(path: Path | None = None) -> Config:
"""Load configuration from YAML file.
Search order:
1. Explicit path if provided
2. ./compose-farm.yaml
3. $XDG_CONFIG_HOME/compose-farm/compose-farm.yaml (defaults to ~/.config)
1. Explicit path if provided via --config
2. CF_CONFIG environment variable
3. ./compose-farm.yaml
4. $XDG_CONFIG_HOME/compose-farm/compose-farm.yaml (defaults to ~/.config)
"""
search_paths = [
Path("compose-farm.yaml"),
@@ -159,6 +160,8 @@ def load_config(path: Path | None = None) -> Config:
if path:
config_path = path
elif env_path := os.environ.get("CF_CONFIG"):
config_path = Path(env_path)
else:
config_path = None
for p in search_paths:
@@ -170,6 +173,13 @@ def load_config(path: Path | None = None) -> Config:
msg = f"Config file not found. Searched: {', '.join(str(p) for p in search_paths)}"
raise FileNotFoundError(msg)
if config_path.is_dir():
msg = (
f"Config path is a directory, not a file: {config_path}\n"
"This often happens when Docker creates an empty directory for a missing mount."
)
raise FileNotFoundError(msg)
with config_path.open() as f:
raw = yaml.safe_load(f)

View File

@@ -12,8 +12,7 @@ from typing import TYPE_CHECKING, Any
import asyncssh
from rich.markup import escape
from .console import console as _console
from .console import err_console as _err_console
from .console import console, err_console
if TYPE_CHECKING:
from collections.abc import Callable
@@ -55,7 +54,7 @@ class CommandResult:
stderr: str = ""
def _is_local(host: Host) -> bool:
def is_local(host: Host) -> bool:
"""Check if host should run locally (no SSH)."""
addr = host.address.lower()
if addr in LOCAL_ADDRESSES:
@@ -101,14 +100,14 @@ async def _run_local_command(
*,
is_stderr: bool = False,
) -> None:
console = _err_console if is_stderr else _console
out = err_console if is_stderr else console
while True:
line = await reader.readline()
if not line:
break
text = line.decode()
if text.strip(): # Skip empty lines
console.print(f"[cyan]\\[{prefix}][/] {escape(text)}", end="")
out.print(f"[cyan]\\[{prefix}][/] {escape(text)}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
@@ -130,7 +129,7 @@ async def _run_local_command(
stderr=stderr_data.decode() if stderr_data else "",
)
except OSError as e:
_err_console.print(f"[cyan]\\[{service}][/] [red]Local error:[/] {e}")
err_console.print(f"[cyan]\\[{service}][/] [red]Local error:[/] {e}")
return CommandResult(service=service, exit_code=1, success=False)
@@ -174,10 +173,10 @@ async def _run_ssh_command(
*,
is_stderr: bool = False,
) -> None:
console = _err_console if is_stderr else _console
out = err_console if is_stderr else console
async for line in reader:
if line.strip(): # Skip empty lines
console.print(f"[cyan]\\[{prefix}][/] {escape(line)}", end="")
out.print(f"[cyan]\\[{prefix}][/] {escape(line)}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
@@ -199,7 +198,7 @@ async def _run_ssh_command(
stderr=stderr_data,
)
except (OSError, asyncssh.Error) as e:
_err_console.print(f"[cyan]\\[{service}][/] [red]SSH error:[/] {e}")
err_console.print(f"[cyan]\\[{service}][/] [red]SSH error:[/] {e}")
return CommandResult(service=service, exit_code=1, success=False)
@@ -212,7 +211,7 @@ async def run_command(
raw: bool = False,
) -> CommandResult:
"""Run a command on a host (locally or via SSH)."""
if _is_local(host):
if is_local(host):
return await _run_local_command(command, service, stream=stream, raw=raw)
return await _run_ssh_command(host, command, service, stream=stream, raw=raw)
@@ -253,23 +252,6 @@ async def run_compose_on_host(
return await run_command(host, command, service, stream=stream, raw=raw)
async def run_compose_multi_host(
config: Config,
service: str,
compose_cmd: str,
*,
stream: bool = True,
raw: bool = False,
) -> list[CommandResult]:
"""Run a docker compose command on all hosts for a multi-host service.
Returns a list of results, one per host.
"""
return await run_sequential_commands_multi_host(
config, service, [compose_cmd], stream=stream, raw=raw
)
async def run_on_services(
config: Config,
services: list[str],
@@ -286,7 +268,7 @@ async def run_on_services(
return await run_sequential_on_services(config, services, [compose_cmd], stream=stream, raw=raw)
async def run_sequential_commands(
async def _run_sequential_commands(
config: Config,
service: str,
commands: list[str],
@@ -302,7 +284,7 @@ async def run_sequential_commands(
return CommandResult(service=service, exit_code=0, success=True)
async def run_sequential_commands_multi_host(
async def _run_sequential_commands_multi_host(
config: Config,
service: str,
commands: list[str],
@@ -356,13 +338,13 @@ async def run_sequential_on_services(
for service in services:
if config.is_multi_host(service):
multi_host_tasks.append(
run_sequential_commands_multi_host(
_run_sequential_commands_multi_host(
config, service, commands, stream=stream, raw=raw
)
)
else:
single_host_tasks.append(
run_sequential_commands(config, service, commands, stream=stream, raw=raw)
_run_sequential_commands(config, service, commands, stream=stream, raw=raw)
)
# Gather results separately to maintain type safety

View File

@@ -20,7 +20,7 @@ if TYPE_CHECKING:
DEFAULT_LOG_PATH = xdg_config_home() / "compose-farm" / "dockerfarm-log.toml"
DIGEST_HEX_LENGTH = 64
_DIGEST_HEX_LENGTH = 64
@dataclass(frozen=True)
@@ -47,7 +47,8 @@ class SnapshotEntry:
}
def _isoformat(dt: datetime) -> str:
def isoformat(dt: datetime) -> str:
"""Format a datetime as an ISO 8601 string with Z suffix for UTC."""
return dt.astimezone(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
@@ -96,13 +97,13 @@ def _extract_image_fields(record: dict[str, Any]) -> tuple[str, str]:
or ""
)
if digest and not digest.startswith("sha256:") and len(digest) == DIGEST_HEX_LENGTH:
if digest and not digest.startswith("sha256:") and len(digest) == _DIGEST_HEX_LENGTH:
digest = f"sha256:{digest}"
return image, digest
async def _collect_service_entries(
async def collect_service_entries(
config: Config,
service: str,
*,
@@ -139,19 +140,21 @@ async def _collect_service_entries(
return entries
def _load_existing_entries(log_path: Path) -> list[dict[str, str]]:
def load_existing_entries(log_path: Path) -> list[dict[str, str]]:
"""Load existing snapshot entries from a TOML log file."""
if not log_path.exists():
return []
data = tomllib.loads(log_path.read_text())
return list(data.get("entries", []))
def _merge_entries(
def merge_entries(
existing: Iterable[dict[str, str]],
new_entries: Iterable[SnapshotEntry],
*,
now_iso: str,
) -> list[dict[str, str]]:
"""Merge new snapshot entries with existing ones, preserving first_seen timestamps."""
merged: dict[tuple[str, str, str], dict[str, str]] = {
(e["service"], e["host"], e["digest"]): dict(e) for e in existing
}
@@ -164,7 +167,8 @@ def _merge_entries(
return list(merged.values())
def _write_toml(log_path: Path, *, meta: dict[str, str], entries: list[dict[str, str]]) -> None:
def write_toml(log_path: Path, *, meta: dict[str, str], entries: list[dict[str, str]]) -> None:
"""Write snapshot entries to a TOML log file."""
lines: list[str] = ["[meta]"]
lines.extend(f'{key} = "{_escape(meta[key])}"' for key in sorted(meta))
@@ -189,45 +193,3 @@ def _write_toml(log_path: Path, *, meta: dict[str, str], entries: list[dict[str,
content = "\n".join(lines).rstrip() + "\n"
log_path.parent.mkdir(parents=True, exist_ok=True)
log_path.write_text(content)
async def snapshot_services(
config: Config,
services: list[str],
*,
log_path: Path | None = None,
now: datetime | None = None,
run_compose_fn: Callable[..., Awaitable[CommandResult]] = run_compose,
) -> Path:
"""Capture current image digests for services and write them to a TOML log.
- Preserves the earliest `first_seen` per (service, host, digest)
- Updates `last_seen` for digests observed in this snapshot
- Leaves untouched digests that were not part of this run (history is kept)
"""
if not services:
error = "No services specified for snapshot"
raise RuntimeError(error)
log_path = log_path or DEFAULT_LOG_PATH
now_dt = now or datetime.now(UTC)
now_iso = _isoformat(now_dt)
existing_entries = _load_existing_entries(log_path)
snapshot_entries: list[SnapshotEntry] = []
for service in services:
snapshot_entries.extend(
await _collect_service_entries(
config, service, now=now_dt, run_compose_fn=run_compose_fn
)
)
if not snapshot_entries:
error = "No image digests were captured"
raise RuntimeError(error)
merged_entries = _merge_entries(existing_entries, snapshot_entries, now_iso=now_iso)
meta = {"generated_at": now_iso, "compose_dir": str(config.compose_dir)}
_write_toml(log_path, meta=meta, entries=merged_entries)
return log_path

View File

@@ -6,7 +6,6 @@ CLI commands are thin wrappers around these functions.
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from .compose import parse_external_networks, parse_host_volumes
@@ -15,7 +14,6 @@ from .executor import (
CommandResult,
check_networks_exist,
check_paths_exist,
check_service_running,
run_command,
run_compose,
run_compose_on_host,
@@ -23,8 +21,6 @@ from .executor import (
from .state import get_service_host, set_multi_host_service, set_service_host
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable
from .config import Config
@@ -35,7 +31,7 @@ def get_service_paths(cfg: Config, service: str) -> list[str]:
return paths
async def check_mounts_for_migration(
async def _check_mounts_for_migration(
cfg: Config,
service: str,
target_host: str,
@@ -46,7 +42,7 @@ async def check_mounts_for_migration(
return [p for p, found in exists.items() if not found]
async def check_networks_for_migration(
async def _check_networks_for_migration(
cfg: Config,
service: str,
target_host: str,
@@ -59,7 +55,7 @@ async def check_networks_for_migration(
return [n for n, found in exists.items() if not found]
async def preflight_check(
async def _preflight_check(
cfg: Config,
service: str,
target_host: str,
@@ -68,12 +64,12 @@ async def preflight_check(
Returns (missing_paths, missing_networks).
"""
missing_paths = await check_mounts_for_migration(cfg, service, target_host)
missing_networks = await check_networks_for_migration(cfg, service, target_host)
missing_paths = await _check_mounts_for_migration(cfg, service, target_host)
missing_networks = await _check_networks_for_migration(cfg, service, target_host)
return missing_paths, missing_networks
def report_preflight_failures(
def _report_preflight_failures(
service: str,
target_host: str,
missing_paths: list[str],
@@ -104,9 +100,9 @@ async def _up_multi_host_service(
# Pre-flight checks on all hosts
for host_name in host_names:
missing_paths, missing_networks = await preflight_check(cfg, service, host_name)
missing_paths, missing_networks = await _preflight_check(cfg, service, host_name)
if missing_paths or missing_networks:
report_preflight_failures(service, host_name, missing_paths, missing_networks)
_report_preflight_failures(service, host_name, missing_paths, missing_networks)
results.append(
CommandResult(service=f"{service}@{host_name}", exit_code=1, success=False)
)
@@ -157,9 +153,9 @@ async def up_services(
current_host = get_service_host(cfg, service)
# Pre-flight check: verify paths and networks exist on target
missing_paths, missing_networks = await preflight_check(cfg, service, target_host)
missing_paths, missing_networks = await _preflight_check(cfg, service, target_host)
if missing_paths or missing_networks:
report_preflight_failures(service, target_host, missing_paths, missing_networks)
_report_preflight_failures(service, target_host, missing_paths, missing_networks)
results.append(CommandResult(service=service, exit_code=1, success=False))
continue
@@ -196,45 +192,6 @@ async def up_services(
return results
async def discover_running_services(cfg: Config) -> dict[str, str | list[str]]:
"""Discover which services are running on which hosts.
Returns a dict mapping service names to host name(s).
Multi-host services return a list of hosts where they're running.
"""
discovered: dict[str, str | list[str]] = {}
for service in cfg.services:
assigned_hosts = cfg.get_hosts(service)
if cfg.is_multi_host(service):
# For multi-host services, find all hosts where it's running (check in parallel)
checks = await asyncio.gather(
*[check_service_running(cfg, service, h) for h in assigned_hosts]
)
running_hosts = [
h for h, running in zip(assigned_hosts, checks, strict=True) if running
]
if running_hosts:
discovered[service] = running_hosts
else:
# Single-host service - check assigned host first
assigned_host = assigned_hosts[0]
if await check_service_running(cfg, service, assigned_host):
discovered[service] = assigned_host
continue
# Check other hosts in case service was migrated but state is stale
for host_name in cfg.hosts:
if host_name == assigned_host:
continue
if await check_service_running(cfg, service, host_name):
discovered[service] = host_name
break
return discovered
async def check_host_compatibility(
cfg: Config,
service: str,
@@ -253,50 +210,3 @@ async def check_host_compatibility(
results[host_name] = (found, len(paths), missing)
return results
async def _check_resources(
cfg: Config,
services: list[str],
get_resources: Callable[[Config, str], list[str]],
check_exists: Callable[[Config, str, list[str]], Awaitable[dict[str, bool]]],
) -> list[tuple[str, str, str]]:
"""Generic check for resources (mounts, networks) on configured hosts."""
missing: list[tuple[str, str, str]] = []
for service in services:
host_names = cfg.get_hosts(service)
resources = get_resources(cfg, service)
if not resources:
continue
for host_name in host_names:
exists = await check_exists(cfg, host_name, resources)
for item, found in exists.items():
if not found:
missing.append((service, host_name, item))
return missing
async def check_mounts_on_configured_hosts(
cfg: Config,
services: list[str],
) -> list[tuple[str, str, str]]:
"""Check mount paths exist on configured hosts.
Returns list of (service, host, missing_path) tuples.
"""
return await _check_resources(cfg, services, get_service_paths, check_paths_exist)
async def check_networks_on_configured_hosts(
cfg: Config,
services: list[str],
) -> list[tuple[str, str, str]]:
"""Check Docker networks exist on configured hosts.
Returns list of (service, host, missing_network) tuples.
"""
return await _check_resources(cfg, services, parse_external_networks, check_networks_exist)

View File

@@ -64,17 +64,6 @@ def get_service_host(config: Config, service: str) -> str | None:
return value
def get_service_hosts(config: Config, service: str) -> list[str]:
"""Get all hosts where a service is currently deployed."""
state = load_state(config)
value = state.get(service)
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def set_service_host(config: Config, service: str, host: str) -> None:
"""Record that a service is deployed on a host."""
with _modify_state(config) as state:

View File

@@ -26,7 +26,7 @@ if TYPE_CHECKING:
@dataclass
class TraefikServiceSource:
class _TraefikServiceSource:
"""Source information to build an upstream for a Traefik service."""
traefik_service: str
@@ -38,9 +38,9 @@ class TraefikServiceSource:
scheme: str | None = None
LIST_VALUE_KEYS = {"entrypoints", "middlewares"}
MIN_ROUTER_PARTS = 3
MIN_SERVICE_LABEL_PARTS = 6
_LIST_VALUE_KEYS = {"entrypoints", "middlewares"}
_MIN_ROUTER_PARTS = 3
_MIN_SERVICE_LABEL_PARTS = 6
def _parse_value(key: str, raw_value: str) -> Any:
@@ -51,7 +51,7 @@ def _parse_value(key: str, raw_value: str) -> Any:
if value.isdigit():
return int(value)
last_segment = key.rsplit(".", 1)[-1]
if last_segment in LIST_VALUE_KEYS:
if last_segment in _LIST_VALUE_KEYS:
parts = [v.strip() for v in value.split(",")] if "," in value else [value]
return [part for part in parts if part]
return value
@@ -102,7 +102,7 @@ def _insert(root: dict[str, Any], key_path: list[str], value: Any) -> None: # n
current = container_list[list_index]
def _resolve_published_port(source: TraefikServiceSource) -> tuple[int | None, str | None]:
def _resolve_published_port(source: _TraefikServiceSource) -> tuple[int | None, str | None]:
"""Resolve host-published port for a Traefik service.
Returns (published_port, warning_message).
@@ -140,7 +140,7 @@ def _resolve_published_port(source: TraefikServiceSource) -> tuple[int | None, s
def _finalize_http_services(
dynamic: dict[str, Any],
sources: dict[str, TraefikServiceSource],
sources: dict[str, _TraefikServiceSource],
warnings: list[str],
) -> None:
for traefik_service, source in sources.items():
@@ -211,7 +211,7 @@ def _process_router_label(
if not key_without_prefix.startswith("http.routers."):
return
router_parts = key_without_prefix.split(".")
if len(router_parts) < MIN_ROUTER_PARTS:
if len(router_parts) < _MIN_ROUTER_PARTS:
return
router_name = router_parts[2]
router_remainder = router_parts[3:]
@@ -229,12 +229,12 @@ def _process_service_label(
host_address: str,
ports: list[PortMapping],
service_names: set[str],
sources: dict[str, TraefikServiceSource],
sources: dict[str, _TraefikServiceSource],
) -> None:
if not key_without_prefix.startswith("http.services."):
return
parts = key_without_prefix.split(".")
if len(parts) < MIN_SERVICE_LABEL_PARTS:
if len(parts) < _MIN_SERVICE_LABEL_PARTS:
return
traefik_service = parts[2]
service_names.add(traefik_service)
@@ -242,7 +242,7 @@ def _process_service_label(
source = sources.get(traefik_service)
if source is None:
source = TraefikServiceSource(
source = _TraefikServiceSource(
traefik_service=traefik_service,
stack=stack,
compose_service=compose_service,
@@ -267,7 +267,7 @@ def _process_service_labels(
host_address: str,
env: dict[str, str],
dynamic: dict[str, Any],
sources: dict[str, TraefikServiceSource],
sources: dict[str, _TraefikServiceSource],
warnings: list[str],
) -> None:
labels = normalize_labels(definition.get("labels"), env)
@@ -328,7 +328,7 @@ def generate_traefik_config(
"""
dynamic: dict[str, Any] = {}
warnings: list[str] = []
sources: dict[str, TraefikServiceSource] = {}
sources: dict[str, _TraefikServiceSource] = {}
# Determine Traefik's host from service assignment
traefik_host = None
@@ -366,7 +366,7 @@ def generate_traefik_config(
return dynamic, warnings
TRAEFIK_CONFIG_HEADER = """\
_TRAEFIK_CONFIG_HEADER = """\
# Auto-generated by compose-farm
# https://github.com/basnijholt/compose-farm
#
@@ -382,4 +382,4 @@ TRAEFIK_CONFIG_HEADER = """\
def render_traefik_config(dynamic: dict[str, Any]) -> str:
"""Render Traefik dynamic config as YAML with a header comment."""
body = yaml.safe_dump(dynamic, sort_keys=False)
return TRAEFIK_CONFIG_HEADER + body
return _TRAEFIK_CONFIG_HEADER + body

View File

@@ -8,10 +8,10 @@ import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import (
CommandResult,
_is_local,
_run_local_command,
check_networks_exist,
check_paths_exist,
is_local,
run_command,
run_compose,
run_on_services,
@@ -22,7 +22,7 @@ linux_only = pytest.mark.skipif(sys.platform != "linux", reason="Linux-only shel
class TestIsLocal:
"""Tests for _is_local function."""
"""Tests for is_local function."""
@pytest.mark.parametrize(
"address",
@@ -30,7 +30,7 @@ class TestIsLocal:
)
def test_local_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is True
assert is_local(host) is True
@pytest.mark.parametrize(
"address",
@@ -38,7 +38,7 @@ class TestIsLocal:
)
def test_remote_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is False
assert is_local(host) is False
class TestRunLocalCommand:

View File

@@ -9,7 +9,14 @@ import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.logs import _parse_images_output, snapshot_services
from compose_farm.logs import (
_parse_images_output,
collect_service_entries,
isoformat,
load_existing_entries,
merge_entries,
write_toml,
)
def test_parse_images_output_handles_list_and_lines() -> None:
@@ -55,26 +62,29 @@ async def test_snapshot_preserves_first_seen(tmp_path: Path) -> None:
log_path = tmp_path / "dockerfarm-log.toml"
# First snapshot
first_time = datetime(2025, 1, 1, tzinfo=UTC)
await snapshot_services(
config,
["svc"],
log_path=log_path,
now=first_time,
run_compose_fn=fake_run_compose,
first_entries = await collect_service_entries(
config, "svc", now=first_time, run_compose_fn=fake_run_compose
)
first_iso = isoformat(first_time)
merged = merge_entries([], first_entries, now_iso=first_iso)
meta = {"generated_at": first_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
after_first = tomllib.loads(log_path.read_text())
first_seen = after_first["entries"][0]["first_seen"]
# Second snapshot
second_time = datetime(2025, 2, 1, tzinfo=UTC)
await snapshot_services(
config,
["svc"],
log_path=log_path,
now=second_time,
run_compose_fn=fake_run_compose,
second_entries = await collect_service_entries(
config, "svc", now=second_time, run_compose_fn=fake_run_compose
)
second_iso = isoformat(second_time)
existing = load_existing_entries(log_path)
merged = merge_entries(existing, second_entries, now_iso=second_iso)
meta = {"generated_at": second_iso, "compose_dir": str(config.compose_dir)}
write_toml(log_path, meta=meta, entries=merged)
after_second = tomllib.loads(log_path.read_text())
entry = after_second["entries"][0]

View File

@@ -1,14 +1,12 @@
"""Tests for sync command and related functions."""
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from compose_farm import cli as cli_module
from compose_farm import executor as executor_module
from compose_farm import operations as operations_module
from compose_farm import state as state_module
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult, check_service_running
@@ -95,42 +93,6 @@ class TestCheckServiceRunning:
assert result is False
class TestDiscoverRunningServices:
"""Tests for discover_running_services function."""
@pytest.mark.asyncio
async def test_discovers_on_assigned_host(self, mock_config: Config) -> None:
"""Discovers service running on its assigned host."""
with patch.object(
operations_module, "check_service_running", new_callable=AsyncMock
) as mock_check:
# plex running on nas01, jellyfin not running, sonarr on nas02
async def check_side_effect(_cfg: Any, service: str, host: str) -> bool:
return (service == "plex" and host == "nas01") or (
service == "sonarr" and host == "nas02"
)
mock_check.side_effect = check_side_effect
result = await operations_module.discover_running_services(mock_config)
assert result == {"plex": "nas01", "sonarr": "nas02"}
@pytest.mark.asyncio
async def test_discovers_on_different_host(self, mock_config: Config) -> None:
"""Discovers service running on non-assigned host (after migration)."""
with patch.object(
operations_module, "check_service_running", new_callable=AsyncMock
) as mock_check:
# plex migrated to nas02
async def check_side_effect(_cfg: Any, service: str, host: str) -> bool:
return service == "plex" and host == "nas02"
mock_check.side_effect = check_side_effect
result = await operations_module.discover_running_services(mock_config)
assert result == {"plex": "nas02"}
class TestReportSyncChanges:
"""Tests for _report_sync_changes function."""