Compare commits

...

90 Commits

Author SHA1 Message Date
Bas Nijholt
d8e54aa347 check: Fix double newline spacing in output 2025-12-14 20:35:25 -08:00
Bas Nijholt
b2b6b421ba check: Add spacing before mounts/networks progress bar 2025-12-14 20:33:36 -08:00
Bas Nijholt
c6b35f02f0 check: Add spacing before SSH progress bar 2025-12-14 20:32:16 -08:00
Bas Nijholt
7e43b0a6b8 check: Fix spacing after transient progress bar 2025-12-14 20:31:20 -08:00
Bas Nijholt
2915b287ba check: Add SSH connectivity check as first remote step
- Check SSH connectivity to all remote hosts before mount/network checks
- Skip local hosts (no SSH needed)
- Show progress bar during SSH checks
- Report unreachable hosts with clear error messages
- Add newline spacing for better output formatting
2025-12-14 20:30:36 -08:00
Bas Nijholt
ae561db0c9 check: Add progress bar and parallelize mount/network checks
- Parallelize mount and network checking for all services
- Add Rich progress bar showing count, elapsed time, and service name
- Move all inline imports to top-level (contextlib, datetime, logs)
- Also sort state file entries alphabetically for consistency
2025-12-14 20:24:54 -08:00
Bas Nijholt
2d132747c4 sync: Enhance progress bars with count and elapsed time
Show "Discovering ━━━━ 32/65 • 0:00:05 • service-name" format with:
- M of N complete count
- Elapsed time
- Current service name
2025-12-14 20:15:39 -08:00
Bas Nijholt
2848163a04 sync: Add progress bars and parallelize operations
- Parallelize service discovery across all services
- Parallelize image digest capture
- Show transient Rich progress bars during both operations
- Significantly faster sync due to concurrent SSH connections
2025-12-14 20:13:42 -08:00
Bas Nijholt
76aa6e11d2 logs: Make --all and --host mutually exclusive
These options conflict conceptually - --all means all services across
all hosts, while --host means all services on a specific host.
2025-12-14 20:10:28 -08:00
Bas Nijholt
d377df15b4 logs: Add --host filter and contextual --tail default
- Add --host/-H option to filter logs to services on a specific host
- Default --tail to 20 lines when showing multiple services (--all, --host, or >1 service)
- Default to 100 lines for single service
- Add tests for contextual default and host filtering
2025-12-14 20:04:40 -08:00
Bas Nijholt
334c17cc28 logs: Use contextual default for --tail option
Default to 20 lines when --all is specified (quick overview),
100 lines otherwise (detailed view for specific services).
2025-12-14 19:59:12 -08:00
Bas Nijholt
f148b5bd3a docs: Add TrueNAS NFS root squash configuration guide
Explains how to set maproot_user/maproot_group to root/wheel
in TrueNAS to disable root squash, allowing Docker containers
running as root to write to NFS-mounted volumes.
2025-12-14 17:24:46 -08:00
Bas Nijholt
54af649d76 Make stats progress bar transient 2025-12-14 15:31:00 -08:00
Bas Nijholt
f6e5a5fa56 Add progress bar when querying hosts in stats --live 2025-12-14 15:30:37 -08:00
Bas Nijholt
01aa24d0db style: Add borders to stats summary table 2025-12-14 15:25:17 -08:00
Bas Nijholt
3e702ef72e Add stats command for overview of hosts and services
Shows hosts table (address, configured/running services) and summary
(total hosts, services, compose files on disk, pending migrations).
Use --live to query Docker for actual container counts.
2025-12-14 15:24:30 -08:00
Bas Nijholt
a31218f7e5 docs: Remove trivial progress counter detail 2025-12-14 15:16:45 -08:00
Bas Nijholt
5decb3ed95 docs: Add --migrate flag and hybrid SSH approach 2025-12-14 15:14:56 -08:00
Bas Nijholt
da61436fbb Use native ssh for raw mode, asyncssh for streaming
- Raw mode uses subprocess with `ssh -t` for proper TTY handling
- Progress bars now render correctly on remote hosts
- asyncssh still used for non-raw parallel streaming with prefixes
- Remove redundant header prints (operations.py handles them)
2025-12-14 15:12:48 -08:00
Bas Nijholt
b6025af0c8 Fix newline after raw output to prevent line mixing 2025-12-14 14:49:33 -08:00
Bas Nijholt
ab914677c4 Add progress counter [n/total] to up command 2025-12-14 14:48:48 -08:00
Bas Nijholt
c0b421f812 Add --migrate flag to up command
Automatically detects services where state differs from config
and migrates only those. Usage: cf up --migrate or cf up -m
2025-12-14 14:47:43 -08:00
Bas Nijholt
2a446c800f Always use raw output for up command
- Print service header before raw output (local and SSH)
- up command always uses raw=True since migrations are sequential
- Gives clean progress bar output without per-line prefixes
2025-12-14 14:44:53 -08:00
Bas Nijholt
dc541c0298 test: Skip shell-dependent tests on Windows/Mac 2025-12-14 14:28:31 -08:00
Bas Nijholt
4d9b8b5ba4 docs: Add TrueNAS NFS crossmnt workaround
Documents how to access child ZFS datasets over NFS by injecting
the crossmnt option into /etc/exports. Includes Python script and
setup instructions for cron-based persistence.
2025-12-14 14:11:10 -08:00
Bas Nijholt
566a07d3a4 Refactor: separate concerns into dedicated modules
- Extract compose.py from traefik.py for generic compose parsing
  (env loading, interpolation, ports, volumes, networks)
- Rename ssh.py to executor.py for clarity
- Extract operations.py from cli.py for business logic
  (up_services, discover_running_services, preflight checks)
- Update CLAUDE.md with new architecture diagram
- Add docs/dev/future-improvements.md for low-priority items

CLI is now a thin layer that delegates to operations module.
All 70 tests pass.
2025-12-14 12:49:24 -08:00
Bas Nijholt
921ce6f13a Add raw output mode for single-service operations
When operating on a single service, pass output directly to
stdout/stderr instead of prefixing each line with [service].
This enables proper handling of \r progress bars during
docker pull, up, etc.
2025-12-14 12:15:36 -08:00
Bas Nijholt
708e09a8cc Show target host when starting services 2025-12-14 12:09:07 -08:00
Bas Nijholt
04154b84f6 Add tests for network and path checking
- test_traefik: Tests for parse_external_networks()
- test_ssh: Tests for check_paths_exist() and check_networks_exist()
2025-12-14 12:08:35 -08:00
Bas Nijholt
2bc9b09e58 Add Docker network validation and init-network command
- check: Validates external networks exist on configured hosts
- up: Pre-flight check blocks if networks missing on target host
- init-network: Creates Docker network with consistent subnet/gateway
  across hosts (default: mynetwork 172.20.0.0/16)

Networks defined as `external: true` in compose files are now
checked before starting or migrating services.
2025-12-14 12:06:36 -08:00
Bas Nijholt
16d517dcd0 docs: Update README and CLAUDE.md for redesigned check command 2025-12-14 10:56:04 -08:00
Bas Nijholt
5e8d09b010 Redesign check command: unified validation + host compatibility
Merged check-mounts into check command. Now provides:
- Config validation (compose files exist)
- Traefik label validation
- Mount path validation (SSH-based)
- Host compatibility matrix when checking specific services

Usage:
  cf check              # Full validation of all services
  cf check --local      # Skip SSH mount checks (fast)
  cf check jellyfin     # Check service + show which hosts can run it

Removed standalone check-mounts command (merged into check).
2025-12-14 10:43:34 -08:00
Bas Nijholt
6fc3535449 Add pre-flight mount check before migration
When migrating a service to a new host, check that all required volume
mount paths exist on the target host BEFORE running down on the old host.
This prevents failed migrations where the service is stopped but can't
start on the new host due to missing NFS mounts.
2025-12-14 10:30:56 -08:00
Bas Nijholt
9158dba0ce Add check-mounts command to verify NFS paths exist
New command to verify volume mount paths exist on target hosts before
migration. Parses bind mounts from compose files and SSHs to hosts to
check each path exists.

- check_paths_exist() in ssh.py: batch check multiple paths efficiently
- parse_host_volumes() in traefik.py: extract bind mount paths from compose
- check-mounts command in cli.py: groups by host, reports missing paths

Usage: cf check-mounts plex jellyfin
       cf check-mounts --all
2025-12-14 10:25:26 -08:00
Bas Nijholt
7b2c431ca3 fix: Change whoami example port to 18082 to avoid conflicts 2025-12-14 09:46:20 -08:00
Bas Nijholt
9deb460cfc Add Traefik example to examples directory
- traefik/docker-compose.yml: Traefik with docker and file providers
- whoami/docker-compose.yml: Test service with Traefik labels
- Updated compose-farm.yaml with traefik_file auto-regeneration
- Updated README.md with Traefik usage instructions
2025-12-14 09:44:03 -08:00
Bas Nijholt
2ce6f2473b docs: Add Traefik config options to example 2025-12-14 01:19:13 -08:00
Bas Nijholt
04d8444168 docs: Use consistent server-1/server-2 naming in example config 2025-12-14 01:18:50 -08:00
Bas Nijholt
b539c4ba76 docs: Update CLAUDE.md with all modules and commands 2025-12-14 01:17:30 -08:00
Bas Nijholt
473bc089c7 docs: Use consistent server-1/server-2 naming throughout 2025-12-14 01:15:46 -08:00
Bas Nijholt
50f405eb77 docs: Use uv tool install for CLI tools 2025-12-14 01:14:12 -08:00
Bas Nijholt
fd0d3bcbcf docs: Use clearer host names in NFS example 2025-12-14 01:13:58 -08:00
Bas Nijholt
f2e8ab0387 docs: Recommend uv for installation 2025-12-14 01:13:24 -08:00
Bas Nijholt
dfbf2748c7 docs: Reorganize README for better flow 2025-12-14 01:12:09 -08:00
Bas Nijholt
57b0ba5916 CSS for logo 2025-12-14 00:59:59 -08:00
Bas Nijholt
e668fb0faf Add logo 2025-12-14 00:58:58 -08:00
Bas Nijholt
2702203cb5 fix: Handle non-string address in getaddrinfo result 2025-12-14 00:55:11 -08:00
Bas Nijholt
27f17a2451 Remove unused PortMapping.protocol field 2025-12-14 00:52:47 -08:00
Bas Nijholt
98c2492d21 docs: Add cf alias and check command to README 2025-12-14 00:41:26 -08:00
Bas Nijholt
04339cbb9a Group CLI commands into Lifecycle, Monitoring, Configuration 2025-12-14 00:37:18 -08:00
Bas Nijholt
cdb3b1d257 Show friendly error when config file not found
Instead of a Python traceback, display a clean error message with
the red ✗ symbol when the config file cannot be found.
2025-12-14 00:31:36 -08:00
Bas Nijholt
0913769729 Fix check command to validate all services with check_all flag 2025-12-14 00:23:23 -08:00
Bas Nijholt
3a1d5b77b5 Add traefik port validation to check command 2025-12-14 00:19:17 -08:00
Bas Nijholt
e12002ce86 Add test for network_mode: service:X port lookup 2025-12-14 00:03:11 -08:00
Bas Nijholt
676a6fe72d Support network_mode: service:X for port lookup in traefik config 2025-12-14 00:02:07 -08:00
Bas Nijholt
f29f8938fe Add -h as alias for --help 2025-12-13 23:56:33 -08:00
Bas Nijholt
4c0e147786 Escape log output to prevent Rich markup errors 2025-12-13 23:55:44 -08:00
Bas Nijholt
cba61118de Add cf alias for compose-farm command 2025-12-13 23:54:00 -08:00
Bas Nijholt
32dc6b3665 Skip empty lines in streaming output 2025-12-13 23:50:35 -08:00
Bas Nijholt
7d98e664e9 Auto-detect local IPs to skip SSH when on target host 2025-12-13 23:48:28 -08:00
Bas Nijholt
6763403700 Fix duplicate prefix before traefik config message 2025-12-13 23:46:41 -08:00
Bas Nijholt
feb0e13bfd Add check command to find missing services 2025-12-13 23:43:47 -08:00
Bas Nijholt
b86f6d190f Add Rich styling to CLI output
- Service names in cyan, host names in magenta
- Success checkmarks, warning/error symbols
- Colored sync diff indicators (+/-/~)
- Unicode arrows for migrations
2025-12-13 23:40:07 -08:00
Bas Nijholt
5ed15b5445 docs: Add Docker Swarm overlay network notes 2025-12-13 23:16:09 -08:00
Bas Nijholt
761b6dd2d1 Rename state file to compose-farm-state.yaml (not hidden) 2025-12-13 23:01:40 -08:00
Bas Nijholt
e86c2b6d47 docs: Simplify Traefik port requirement note 2025-12-13 22:59:50 -08:00
basnijholt
9353b74c35 chore(docs): update TOC 2025-12-14 06:58:15 +00:00
Bas Nijholt
b7e8e0f3a9 docs: Add limitations and best practices section
Documents cross-host networking limitations:
- Docker DNS doesn't work across hosts
- Dependent services should stay in same compose file
- Ports must be published for cross-host communication
2025-12-13 22:58:01 -08:00
Bas Nijholt
b6c02587bc Rename traefik_host to traefik_service
Instead of specifying the host directly, specify the service name
that runs Traefik. The host is then looked up from the services
mapping, avoiding redundancy.
2025-12-13 22:43:33 -08:00
Bas Nijholt
d412c42ca4 Store state file alongside config file
State is now stored at .compose-farm-state.yaml in the same
directory as the config file. This allows multiple compose-farm
setups with independent state.

State functions now require a Config parameter to locate the
state file via config.get_state_path().
2025-12-13 22:38:11 -08:00
Bas Nijholt
13e0adbbb9 Add traefik_host config to skip local services
When traefik_host is set, services on that host are skipped in
file-provider generation since Traefik's docker provider handles
them directly. This allows running compose-farm from any host
while still generating correct file-provider config.
2025-12-13 22:34:20 -08:00
Bas Nijholt
68c41eb37c Improve missing ports warning message
Replace technical "L3 reachability" phrasing with actionable
guidance: "Add a ports: mapping for cross-host routing."
2025-12-13 22:29:20 -08:00
Bas Nijholt
8af088bb5d Add traefik_file config for auto-regeneration
When traefik_file is set in config, compose-farm automatically
regenerates the Traefik file-provider config after up, down,
restart, and update commands. Eliminates the need to manually
run traefik-file after service changes.
2025-12-13 22:24:29 -08:00
Bas Nijholt
1308eeca12 fix: Skip local services in traefik-file generation
Local services (localhost, local, 127.0.0.1) are handled by Traefik's
docker provider directly. Generating file-provider entries for them
creates conflicting routes with broken localhost URLs (since Traefik
runs in a container where localhost is isolated).

Now traefik-file only generates config for remote services.
2025-12-13 19:51:57 -08:00
Bas Nijholt
a66a68f395 docs: Clarify no merge commits to main rule 2025-12-13 19:44:30 -08:00
Bas Nijholt
6ea25c862e docs: Add traefik directory merging instructions 2025-12-13 19:41:16 -08:00
Bas Nijholt
280524b546 docs: Use GitHub admonition for TL;DR 2025-12-13 19:34:31 -08:00
Bas Nijholt
db9360771b docs: Shorten TL;DR 2025-12-13 19:33:28 -08:00
Bas Nijholt
c7590ed0b7 docs: Move TOC below TL;DR 2025-12-13 19:32:41 -08:00
Bas Nijholt
bb563b9d4b docs: Add TL;DR to README 2025-12-13 19:31:05 -08:00
Bas Nijholt
fe160ee116 fix: Move traefik import to top-level 2025-12-13 17:07:29 -08:00
Bas Nijholt
4c7f49414f docs: Update README for sync command and auto-migration
- Replace snapshot with sync command
- Add auto-migration documentation
- Update compose file naming convention
2025-12-13 16:55:07 -08:00
Bas Nijholt
bebe5b34ba Merge snapshot into sync command
The sync command now performs both operations:
- Discovers running services and updates state.yaml
- Captures image digests and updates dockerfarm-log.toml

Removes the standalone snapshot command to keep the API simple.
2025-12-13 16:53:49 -08:00
Bas Nijholt
5d21e64781 Add sync command to discover running services and update state
The sync command queries all hosts to find where services are actually
running and updates the state file to match reality. Supports --dry-run
to preview changes without modifying state. Useful for initial setup
or after manual changes.
2025-12-13 15:58:29 -08:00
Bas Nijholt
114c7b6eb6 Add check_service_running for discovering running services
Adds a helper function to check if a service has running containers
on a specific host by executing `docker compose ps --status running -q`.
2025-12-13 15:58:29 -08:00
Bas Nijholt
20e281a23e Add tests for state module
Tests cover load, save, get, set, and remove operations
for service deployment state tracking.
2025-12-13 15:58:29 -08:00
Bas Nijholt
ec33d28d6c Add auto-migration support to up/down commands
- up: Detects if service is deployed on a different host and
  automatically runs down on the old host before up on the new
- down: Removes service from state tracking after successful stop
- Enables seamless service migration by just changing the config
2025-12-13 15:58:29 -08:00
Bas Nijholt
a818b7726e Add run_compose_on_host for cross-host operations
Allows running compose commands on a specific host rather than
the configured host for a service. Used for migration when
stopping a service on the old host before starting on the new.
2025-12-13 15:58:29 -08:00
Bas Nijholt
cead3904bf Add state module for tracking deployed services
Tracks which host each service is deployed on in
~/.config/compose-farm/state.yaml. This enables automatic
migration when a service's host assignment changes.
2025-12-13 15:58:29 -08:00
Bas Nijholt
8f5e14d621 Fix pre-commit issues 2025-12-13 14:54:28 -08:00
33 changed files with 3543 additions and 612 deletions

View File

@@ -10,19 +10,26 @@
```
compose_farm/
├── config.py # Pydantic models, YAML loading
├── ssh.py # asyncssh execution, streaming
── cli.py # Typer commands
├── cli.py # Typer commands (thin layer, delegates to operations)
├── config.py # Pydantic models, YAML loading
── compose.py # Compose file parsing (.env, ports, volumes, networks)
├── executor.py # SSH/local command execution, streaming output
├── operations.py # Business logic (up, migrate, discover, preflight checks)
├── state.py # Deployment state tracking (which service on which host)
├── logs.py # Image digest snapshots (dockerfarm-log.toml)
└── traefik.py # Traefik file-provider config generation from labels
```
## Key Design Decisions
1. **asyncssh over Paramiko/Fabric**: Native async support, built-in streaming
1. **Hybrid SSH approach**: asyncssh for parallel streaming with prefixes; native `ssh -t` for raw mode (progress bars)
2. **Parallel by default**: Multiple services run concurrently via `asyncio.gather`
3. **Streaming output**: Real-time stdout/stderr with `[service]` prefix
3. **Streaming output**: Real-time stdout/stderr with `[service]` prefix using Rich
4. **SSH key auth only**: Uses ssh-agent, no password handling (YAGNI)
5. **NFS assumption**: Compose files at same path on all hosts
6. **Local execution**: When host is `localhost`/`local`, skip SSH and run locally
6. **Local IP auto-detection**: Skips SSH when target host matches local machine's IP
7. **State tracking**: Tracks where services are deployed for auto-migration
8. **Pre-flight checks**: Verifies NFS mounts and Docker networks exist before starting/migrating
## Communication Notes
@@ -31,17 +38,24 @@ compose_farm/
## Git Safety
- Never amend commits.
- Never merge into a branch; prefer fast-forward or rebase as directed.
- **NEVER merge anything into main.** Always commit directly or use fast-forward/rebase.
- Never force push.
## Commands Quick Reference
| Command | Docker Compose Equivalent |
|---------|--------------------------|
| `up` | `docker compose up -d` |
| `down` | `docker compose down` |
| `pull` | `docker compose pull` |
| `restart` | `down` + `up -d` |
CLI available as `cf` or `compose-farm`.
| Command | Description |
|---------|-------------|
| `up` | Start services (`docker compose up -d`), auto-migrates if host changed. Use `--migrate` for auto-detection |
| `down` | Stop services (`docker compose down`) |
| `pull` | Pull latest images |
| `restart` | `down` + `up -d` |
| `update` | `pull` + `down` + `up -d` |
| `logs` | `docker compose logs` |
| `ps` | `docker compose ps` |
| `logs` | Show service logs |
| `ps` | Show status of all services |
| `stats` | Show overview (hosts, services, pending migrations; `--live` for container counts) |
| `sync` | Discover running services, update state, capture image digests |
| `check` | Validate config, traefik labels, mounts, networks; show host compatibility |
| `init-network` | Create Docker network on hosts with consistent subnet/gateway |
| `traefik-file` | Generate Traefik file-provider config from compose labels |

232
README.md
View File

@@ -1,23 +1,31 @@
# Compose Farm
<img src="http://files.nijho.lt/compose-farm.png" align="right" style="width: 300px;" />
A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
> [!NOTE]
> Run `docker compose` commands across multiple hosts via SSH. One YAML maps services to hosts. Change the mapping, run `up`, and it auto-migrates. No Kubernetes, no Swarm, no magic.
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
- [Compose Farm](#compose-farm)
- [Why Compose Farm?](#why-compose-farm)
- [Key Assumption: Shared Storage](#key-assumption-shared-storage)
- [Installation](#installation)
- [Configuration](#configuration)
- [Usage](#usage)
- [Traefik Multihost Ingress (File Provider)](#traefik-multihost-ingress-file-provider)
- [Requirements](#requirements)
- [How It Works](#how-it-works)
- [License](#license)
- [Why Compose Farm?](#why-compose-farm)
- [How It Works](#how-it-works)
- [Requirements](#requirements)
- [Limitations & Best Practices](#limitations--best-practices)
- [What breaks when you move a service](#what-breaks-when-you-move-a-service)
- [Best practices](#best-practices)
- [What Compose Farm doesn't do](#what-compose-farm-doesnt-do)
- [Installation](#installation)
- [Configuration](#configuration)
- [Usage](#usage)
- [Auto-Migration](#auto-migration)
- [Traefik Multihost Ingress (File Provider)](#traefik-multihost-ingress-file-provider)
- [License](#license)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Compose Farm
A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
## Why Compose Farm?
I run 100+ Docker Compose stacks on an LXC container that frequently runs out of memory. I needed a way to distribute services across multiple machines without the complexity of:
@@ -27,29 +35,76 @@ I run 100+ Docker Compose stacks on an LXC container that frequently runs out of
**Compose Farm is intentionally simple**: one YAML config mapping services to hosts, and a CLI that runs `docker compose` commands over SSH. That's it.
## Key Assumption: Shared Storage
## How It Works
Compose Farm assumes **all your compose files are accessible at the same path on all hosts**. This is typically achieved via:
1. You run `cf up plex`
2. Compose Farm looks up which host runs `plex` (e.g., `server-1`)
3. It SSHs to `server-1` (or runs locally if `localhost`)
4. It executes `docker compose -f /opt/compose/plex/docker-compose.yml up -d`
5. Output is streamed back with `[plex]` prefix
That's it. No orchestration, no service discovery, no magic.
## Requirements
- Python 3.11+ (we recommend [uv](https://docs.astral.sh/uv/) for installation)
- SSH key-based authentication to your hosts (uses ssh-agent)
- Docker and Docker Compose installed on all target hosts
- **Shared storage**: All compose files must be accessible at the same path on all hosts
- **Docker networks**: External networks must exist on all hosts (use `cf init-network` to create)
Compose Farm assumes your compose files are accessible at the same path on all hosts. This is typically achieved via:
- **NFS mount** (e.g., `/opt/compose` mounted from a NAS)
- **Synced folders** (e.g., Syncthing, rsync)
- **Shared filesystem** (e.g., GlusterFS, Ceph)
```
# Example: NFS mount on all hosts
nas:/volume1/compose → /opt/compose (on nas01)
nas:/volume1/compose → /opt/compose (on nas02)
nas:/volume1/compose → /opt/compose (on nas03)
# Example: NFS mount on all Docker hosts
nas:/volume1/compose → /opt/compose (on server-1)
nas:/volume1/compose → /opt/compose (on server-2)
nas:/volume1/compose → /opt/compose (on server-3)
```
Compose Farm simply runs `docker compose -f /opt/compose/{service}/docker-compose.yml` on the appropriate host—it doesn't copy or sync files.
## Limitations & Best Practices
Compose Farm moves containers between hosts but **does not provide cross-host networking**. Docker's internal DNS and networks don't span hosts.
### What breaks when you move a service
- **Docker DNS** - `http://redis:6379` won't resolve from another host
- **Docker networks** - Containers can't reach each other via network names
- **Environment variables** - `DATABASE_URL=postgres://db:5432` stops working
### Best practices
1. **Keep dependent services together** - If an app needs a database, redis, or worker, keep them in the same compose file on the same host
2. **Only migrate standalone services** - Services that don't talk to other containers (or only talk to external APIs) are safe to move
3. **Expose ports for cross-host communication** - If services must communicate across hosts, publish ports and use IP addresses instead of container names:
```yaml
# Instead of: DATABASE_URL=postgres://db:5432
# Use: DATABASE_URL=postgres://192.168.1.66:5432
```
This includes Traefik routing—containers need published ports for the file-provider to reach them
### What Compose Farm doesn't do
- No overlay networking (use Docker Swarm or Kubernetes for that)
- No service discovery across hosts
- No automatic dependency tracking between compose files
If you need containers on different hosts to communicate seamlessly, you need Docker Swarm, Kubernetes, or a service mesh—which adds the complexity Compose Farm is designed to avoid.
## Installation
```bash
pip install compose-farm
uv tool install compose-farm
# or
uv pip install compose-farm
pip install compose-farm
```
## Configuration
@@ -60,57 +115,89 @@ Create `~/.config/compose-farm/compose-farm.yaml` (or `./compose-farm.yaml` in y
compose_dir: /opt/compose # Must be the same path on all hosts
hosts:
nas01:
server-1:
address: 192.168.1.10
user: docker
nas02:
server-2:
address: 192.168.1.11
# user defaults to current user
local: localhost # Run locally without SSH
services:
plex: nas01
jellyfin: nas02
sonarr: nas01
plex: server-1
jellyfin: server-2
sonarr: server-1
radarr: local # Runs on the machine where you invoke compose-farm
```
Compose files are expected at `{compose_dir}/{service}/docker-compose.yml`.
Compose files are expected at `{compose_dir}/{service}/compose.yaml` (also supports `compose.yml`, `docker-compose.yml`, `docker-compose.yaml`).
## Usage
The CLI is available as both `compose-farm` and the shorter `cf` alias.
```bash
# Start services
compose-farm up plex jellyfin
compose-farm up --all
# Start services (auto-migrates if host changed in config)
cf up plex jellyfin
cf up --all
cf up --migrate # only services needing migration (state ≠ config)
# Stop services
compose-farm down plex
cf down plex
# Pull latest images
compose-farm pull --all
cf pull --all
# Restart (down + up)
compose-farm restart plex
cf restart plex
# Update (pull + down + up) - the end-to-end update command
compose-farm update --all
cf update --all
# Capture image digests to a TOML log (per service or all)
compose-farm snapshot plex
compose-farm snapshot --all # writes ~/.config/compose-farm/dockerfarm-log.toml
# Sync state with reality (discovers running services + captures image digests)
cf sync # updates state.yaml and dockerfarm-log.toml
cf sync --dry-run # preview without writing
# Validate config, traefik labels, mounts, and networks
cf check # full validation (includes SSH checks)
cf check --local # fast validation (skip SSH)
cf check jellyfin # check service + show which hosts can run it
# Create Docker network on new hosts (before migrating services)
cf init-network nuc hp # create mynetwork on specific hosts
cf init-network # create on all hosts
# View logs
compose-farm logs plex
compose-farm logs -f plex # follow
cf logs plex
cf logs -f plex # follow
# Show status
compose-farm ps
cf ps
```
### Auto-Migration
When you change a service's host assignment in config and run `up`, Compose Farm automatically:
1. Checks that required mounts and networks exist on the new host (aborts if missing)
2. Runs `down` on the old host
3. Runs `up -d` on the new host
4. Updates state tracking
Use `cf up --migrate` (or `-m`) to automatically find and migrate all services where the current state differs from config—no need to list them manually.
```yaml
# Before: plex runs on server-1
services:
plex: server-1
# After: change to server-2, then run `cf up plex`
services:
plex: server-2 # Compose Farm will migrate automatically
```
## Traefik Multihost Ingress (File Provider)
If you run a single Traefik instance on one frontdoor host and want it to route to
If you run a single Traefik instance on one "frontdoor" host and want it to route to
Compose Farm services on other hosts, Compose Farm can generate a Traefik fileprovider
fragment from your existing compose labels.
@@ -119,11 +206,11 @@ fragment from your existing compose labels.
- Your `docker-compose.yml` remains the source of truth. Put normal `traefik.*` labels on
the container you want exposed.
- Labels and port specs may use `${VAR}` / `${VAR:-default}`; Compose Farm resolves these
using the stacks `.env` file and your current environment, just like Docker Compose.
using the stack's `.env` file and your current environment, just like Docker Compose.
- Publish a host port for that container (via `ports:`). The generator prefers
hostpublished ports so Traefik can reach the service across hosts; if none are found,
it warns and youd need L3 reachability to container IPs.
- If a router label doesnt specify `traefik.http.routers.<name>.service` and theres only
it warns and you'd need L3 reachability to container IPs.
- If a router label doesn't specify `traefik.http.routers.<name>.service` and there's only
one Traefik service defined on that container, Compose Farm wires the router to it.
- `compose-farm.yaml` stays unchanged: just `hosts` and `services: service → host`.
@@ -156,28 +243,57 @@ providers:
**Generate the fragment**
```bash
compose-farm traefik-file --output /mnt/data/traefik/dynamic.d/compose-farm.generated.yml
cf traefik-file --all --output /mnt/data/traefik/dynamic.d/compose-farm.yml
```
Rerun this after changing Traefik labels, moving a service to another host, or changing
published ports.
## Requirements
**Auto-regeneration**
- Python 3.11+
- SSH key-based authentication to your hosts (uses ssh-agent)
- Docker and Docker Compose installed on all target hosts
- **Shared storage**: All compose files at the same path on all hosts (NFS, Syncthing, etc.)
To automatically regenerate the Traefik config after `up`, `down`, `restart`, or `update`,
add `traefik_file` to your config:
## How It Works
```yaml
compose_dir: /opt/compose
traefik_file: /opt/traefik/dynamic.d/compose-farm.yml # auto-regenerate on up/down/restart/update
traefik_service: traefik # skip services on same host (docker provider handles them)
1. You run `compose-farm up plex`
2. Compose Farm looks up which host runs `plex` (e.g., `nas01`)
3. It SSHs to `nas01` (or runs locally if `localhost`)
4. It executes `docker compose -f /opt/compose/plex/docker-compose.yml up -d`
5. Output is streamed back with `[plex]` prefix
hosts:
# ...
services:
traefik: server-1 # Traefik runs here
plex: server-2 # Services on other hosts get file-provider entries
# ...
```
That's it. No orchestration, no service discovery, no magic.
The `traefik_service` option specifies which service runs Traefik. Services on the same host
are skipped in the file-provider config since Traefik's docker provider handles them directly.
Now `cf up plex` will update the Traefik config automatically—no separate
`traefik-file` command needed.
**Combining with existing config**
If you already have a `dynamic.yml` with manual routes, middlewares, etc., move it into the
directory and Traefik will merge all files:
```bash
mkdir -p /opt/traefik/dynamic.d
mv /opt/traefik/dynamic.yml /opt/traefik/dynamic.d/manual.yml
cf traefik-file --all -o /opt/traefik/dynamic.d/compose-farm.yml
```
Update your Traefik config to use directory watching instead of a single file:
```yaml
# Before
- --providers.file.filename=/dynamic.yml
# After
- --providers.file.directory=/dynamic.d
- --providers.file.watch=true
```
## License

View File

@@ -3,23 +3,28 @@
compose_dir: /opt/compose
# Optional: Auto-regenerate Traefik file-provider config after up/down/restart/update
traefik_file: /opt/traefik/dynamic.d/compose-farm.yml
traefik_service: traefik # Skip services on same host (docker provider handles them)
hosts:
# Full form with all options
nas01:
server-1:
address: 192.168.1.10
user: docker
port: 22
# Short form (just address, user defaults to current user)
nas02: 192.168.1.11
server-2: 192.168.1.11
# Local execution (no SSH)
local: localhost
services:
# Map service names to hosts
# Compose file expected at: {compose_dir}/{service}/docker-compose.yml
plex: nas01
jellyfin: nas02
sonarr: nas01
radarr: nas02
# Compose file expected at: {compose_dir}/{service}/compose.yaml
traefik: server-1 # Traefik runs here
plex: server-2 # Services on other hosts get file-provider entries
jellyfin: server-2
sonarr: server-1
radarr: local

View File

@@ -0,0 +1,90 @@
# Docker Swarm Overlay Networks with Compose Farm
Notes from testing Docker Swarm's attachable overlay networks as a way to get cross-host container networking while still using `docker compose`.
## The Idea
Docker Swarm overlay networks can be made "attachable", allowing regular `docker compose` containers (not just swarm services) to join them. This would give us:
- Cross-host Docker DNS (containers find each other by name)
- No need to publish ports for inter-container communication
- Keep using `docker compose up` instead of `docker stack deploy`
## Setup Steps
```bash
# On manager node
docker swarm init --advertise-addr <manager-ip>
# On worker nodes (use token from init output)
docker swarm join --token <token> <manager-ip>:2377
# Create attachable overlay network (on manager)
docker network create --driver overlay --attachable my-network
# In compose files, add the network
networks:
my-network:
external: true
```
## Required Ports
Docker Swarm requires these ports open **bidirectionally** between all nodes:
| Port | Protocol | Purpose |
|------|----------|---------|
| 2377 | TCP | Cluster management |
| 7946 | TCP + UDP | Node communication |
| 4789 | UDP | Overlay network traffic (VXLAN) |
## Test Results (2024-12-13)
- docker-debian (192.168.1.66) as manager
- dev-lxc (192.168.1.167) as worker
### What worked
- Swarm init and join
- Overlay network creation
- Nodes showed as Ready
### What failed
- Container on dev-lxc couldn't attach to overlay network
- Error: `attaching to network failed... context deadline exceeded`
- Cause: Port 7946 blocked from docker-debian → dev-lxc
### Root cause
Firewall on dev-lxc wasn't configured to allow swarm ports. Opening these ports requires sudo access on each node.
## Conclusion
Docker Swarm overlay networks are **not plug-and-play**. Requirements:
1. Swarm init/join on all nodes
2. Firewall rules on all nodes (needs sudo/root)
3. All nodes must have bidirectional connectivity on 3 ports
For a simpler alternative, consider:
- **Tailscale**: VPN mesh, containers use host's Tailscale IP
- **Host networking + published ports**: What compose-farm does today
- **Keep dependent services together**: Avoid cross-host networking entirely
## Future Work
If we decide to support overlay networks:
1. Add a `compose-farm network create` command that:
- Initializes swarm if needed
- Creates attachable overlay network
- Documents required firewall rules
2. Add network config to compose-farm.yaml:
```yaml
overlay_network: compose-farm-net
```
3. Auto-inject network into compose files (or document manual setup)

View File

@@ -0,0 +1,128 @@
# Future Improvements
Low-priority improvements identified during code review. These are not currently causing issues but could be addressed if they become pain points.
## 1. State Module Efficiency (LOW)
**Current:** Every state operation reads and writes the entire file.
```python
def set_service_host(config, service, host):
state = load_state(config) # Read file
state[service] = host
save_state(config, state) # Write file
```
**Impact:** With 87 services, this is fine. With 1000+, it would be slow.
**Potential fix:** Add batch operations:
```python
def update_state(config, updates: dict[str, str | None]) -> None:
"""Batch update: set services to hosts, None means remove."""
state = load_state(config)
for service, host in updates.items():
if host is None:
state.pop(service, None)
else:
state[service] = host
save_state(config, state)
```
**When to do:** Only if state operations become noticeably slow.
---
## 2. Remote-Aware Compose Path Resolution (LOW)
**Current:** `config.get_compose_path()` checks if files exist on the local filesystem:
```python
def get_compose_path(self, service: str) -> Path:
for filename in ("compose.yaml", "compose.yml", ...):
candidate = service_dir / filename
if candidate.exists(): # Local check!
return candidate
```
**Why this works:** NFS/shared storage means local = remote.
**Why it could break:** If running compose-farm from a machine without the NFS mount, it returns `compose.yaml` (the default) even if `docker-compose.yml` exists on the remote host.
**Potential fix:** Query the remote host for file existence, or accept this limitation and document it.
**When to do:** Only if users need to run compose-farm from non-NFS machines.
---
## 3. Add Integration Tests for CLI Commands (MEDIUM)
**Current:** No integration tests for the actual CLI commands. Tests cover the underlying functions but not the Typer commands themselves.
**Potential fix:** Add integration tests using `CliRunner` from Typer:
```python
from typer.testing import CliRunner
from compose_farm.cli import app
runner = CliRunner()
def test_check_command_validates_config():
result = runner.invoke(app, ["check", "--local"])
assert result.exit_code == 0
```
**When to do:** When CLI behavior becomes complex enough to warrant dedicated testing.
---
## 4. Add Tests for operations.py (MEDIUM)
**Current:** Operations module has 30% coverage. Most logic is tested indirectly through test_sync.py.
**Potential fix:** Add dedicated tests for:
- `up_services()` with migration scenarios
- `preflight_check()`
- `check_host_compatibility()`
**When to do:** When adding new operations or modifying migration logic.
---
## 5. Consider Structured Logging (LOW)
**Current:** Operations print directly to console using Rich. This couples the operations module to the Rich library.
**Potential fix:** Use Python's logging module with a custom Rich handler:
```python
import logging
logger = logging.getLogger(__name__)
# In operations:
logger.info("Migrating %s from %s to %s", service, old_host, new_host)
# In cli.py - configure Rich handler:
from rich.logging import RichHandler
logging.basicConfig(handlers=[RichHandler()])
```
**Benefits:**
- Operations become testable without capturing stdout
- Logs can be redirected to files
- Log levels provide filtering
**When to do:** Only if console output coupling becomes a problem for testing or extensibility.
---
## Design Decisions to Keep
These patterns are working well and should be preserved:
1. **asyncio + asyncssh** - Solid async foundation
2. **Pydantic models** - Clean validation
3. **Rich for output** - Good UX
4. **Test structure** - Good coverage
5. **Module separation** - cli/operations/executor/compose pattern
6. **KISS principle** - Don't over-engineer

169
docs/truenas-nested-nfs.md Normal file
View File

@@ -0,0 +1,169 @@
# TrueNAS NFS: Accessing Child ZFS Datasets
When NFS-exporting a parent ZFS dataset on TrueNAS, child datasets appear as **empty directories** to NFS clients. This document explains the problem and provides a workaround.
## The Problem
TrueNAS structures storage as ZFS datasets. A common pattern is:
```
tank/data <- parent dataset (NFS exported)
tank/data/app1 <- child dataset
tank/data/app2 <- child dataset
```
When you create an NFS share for `tank/data`, clients mount it and see the `app1/` and `app2/` directories—but they're empty. This happens because each ZFS dataset is a separate filesystem, and NFS doesn't traverse into child filesystems by default.
## The Solution: `crossmnt`
The NFS `crossmnt` export option tells the server to allow clients to traverse into child filesystems. However, TrueNAS doesn't expose this option in the UI.
### Workaround Script
This Python script injects `crossmnt` into `/etc/exports`:
```python
#!/usr/bin/env python3
"""
Add crossmnt to TrueNAS NFS exports for child dataset visibility.
Usage: fix-nfs-crossmnt.py /mnt/pool/dataset
Setup:
1. scp fix-nfs-crossmnt.py root@truenas.local:/root/
2. chmod +x /root/fix-nfs-crossmnt.py
3. Test: /root/fix-nfs-crossmnt.py /mnt/pool/dataset
4. Add cron job: TrueNAS UI > System > Advanced > Cron Jobs
Command: /root/fix-nfs-crossmnt.py /mnt/pool/dataset
Schedule: */5 * * * *
"""
import re
import subprocess
import sys
from pathlib import Path
EXPORTS_FILE = Path("/etc/exports")
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} /mnt/pool/dataset", file=sys.stderr)
return 1
export_path = sys.argv[1]
content = EXPORTS_FILE.read_text()
if f'"{export_path}"' not in content:
print(f"ERROR: {export_path} not found in {EXPORTS_FILE}", file=sys.stderr)
return 1
lines = content.splitlines()
result = []
in_block = False
modified = False
for line in lines:
if f'"{export_path}"' in line:
in_block = True
elif line.startswith('"'):
in_block = False
if in_block and line[:1] in (" ", "\t") and "crossmnt" not in line:
line = re.sub(r"\)(\\\s*)?$", r",crossmnt)\1", line)
modified = True
result.append(line)
if not modified:
return 0 # Already applied
EXPORTS_FILE.write_text("\n".join(result) + "\n")
subprocess.run(["exportfs", "-ra"], check=True)
print(f"Added crossmnt to {export_path}")
return 0
if __name__ == "__main__":
sys.exit(main())
```
## Setup Instructions
### 1. Copy the script to TrueNAS
```bash
scp fix-nfs-crossmnt.py root@truenas.local:/root/
ssh root@truenas.local chmod +x /root/fix-nfs-crossmnt.py
```
### 2. Test manually
```bash
ssh root@truenas.local
# Run the script
/root/fix-nfs-crossmnt.py /mnt/tank/data
# Verify crossmnt was added
cat /etc/exports
```
You should see `,crossmnt` added to the client options:
```
"/mnt/tank/data"\
192.168.1.10(sec=sys,rw,no_subtree_check,crossmnt)\
192.168.1.11(sec=sys,rw,no_subtree_check,crossmnt)
```
### 3. Verify on NFS client
```bash
# Before: empty directory
ls /mnt/data/app1/
# (nothing)
# After: actual contents visible
ls /mnt/data/app1/
# config.yaml data/ logs/
```
### 4. Make it persistent
TrueNAS regenerates `/etc/exports` when you modify NFS shares in the UI. To survive this, set up a cron job:
1. Go to **TrueNAS UI → System → Advanced → Cron Jobs → Add**
2. Configure:
- **Description:** Fix NFS crossmnt
- **Command:** `/root/fix-nfs-crossmnt.py /mnt/tank/data`
- **Run As User:** root
- **Schedule:** `*/5 * * * *` (every 5 minutes)
- **Enabled:** checked
3. Save
The script is idempotent—it only modifies the file if `crossmnt` is missing, and skips the write entirely if already applied.
## How It Works
1. Parses `/etc/exports` to find the specified export block
2. Adds `,crossmnt` before the closing `)` on each client line
3. Writes the file only if changes were made
4. Runs `exportfs -ra` to reload the NFS configuration
## Why Not Use SMB Instead?
SMB handles child datasets seamlessly, but:
- NFS is simpler for Linux-to-Linux with matching UIDs
- SMB requires more complex permission mapping for Docker volumes
- Many existing setups already use NFS
## Related Links
- [TrueNAS Forum: Add crossmnt option to NFS exports](https://forums.truenas.com/t/add-crossmnt-option-to-nfs-exports/10573)
- [exports(5) man page](https://man7.org/linux/man-pages/man5/exports.5.html) - see `crossmnt` option
## Tested On
- TrueNAS SCALE 24.10

View File

@@ -0,0 +1,65 @@
# TrueNAS NFS: Disabling Root Squash
When running Docker containers on NFS-mounted storage, containers that run as root will fail to write files unless root squash is disabled. This document explains the problem and solution.
## The Problem
By default, NFS uses "root squash" which maps the root user (UID 0) on clients to `nobody` on the server. This is a security feature to prevent remote root users from having root access to the NFS server's files.
However, many Docker containers run as root internally. When these containers try to write to NFS-mounted volumes, the writes fail with "Permission denied" because the NFS server sees them as `nobody`, not `root`.
Example error in container logs:
```
System.UnauthorizedAccessException: Access to the path '/data' is denied.
Error: EACCES: permission denied, mkdir '/app/data'
```
## The Solution
In TrueNAS, configure the NFS share to map remote root to local root:
### TrueNAS SCALE UI
1. Go to **Shares → NFS**
2. Edit your share
3. Under **Advanced Options**:
- **Maproot User**: `root`
- **Maproot Group**: `wheel`
4. Save
### Result in /etc/exports
```
"/mnt/pool/data"\
192.168.1.25(sec=sys,rw,no_root_squash,no_subtree_check)\
192.168.1.26(sec=sys,rw,no_root_squash,no_subtree_check)
```
The `no_root_squash` option means remote root is treated as root on the server.
## Why `wheel`?
On FreeBSD/TrueNAS, the root user's primary group is `wheel` (GID 0), not `root` like on Linux. So `root:wheel` = `0:0`.
## Security Considerations
Disabling root squash means any machine that can mount the NFS share has full root access to those files. This is acceptable when:
- The NFS clients are on a trusted private network
- Only known hosts (by IP) are allowed to mount the share
- The data isn't security-critical
For home lab setups with Docker containers, this is typically fine.
## Alternative: Run Containers as Non-Root
If you prefer to keep root squash enabled, you can run containers as a non-root user:
1. **LinuxServer.io images**: Set `PUID=1000` and `PGID=1000` environment variables
2. **Other images**: Add `user: "1000:1000"` to the compose service
However, not all containers support running as non-root (they may need to bind to privileged ports, create system directories, etc.).
## Tested On
- TrueNAS SCALE 24.10

View File

@@ -32,11 +32,51 @@ compose-farm down nginx
compose-farm update --all
```
## Traefik Example
Start Traefik and a sample service with Traefik labels:
```bash
cd examples
# Start Traefik (reverse proxy with dashboard)
compose-farm up traefik
# Start whoami (test service with Traefik labels)
compose-farm up whoami
# Access the services
curl -H "Host: whoami.localhost" http://localhost # whoami via Traefik
curl http://localhost:8081 # Traefik dashboard
curl http://localhost:18082 # whoami direct
# Generate Traefik file-provider config (for multi-host setups)
compose-farm traefik-file --all
# Stop everything
compose-farm down --all
```
The `whoami/docker-compose.yml` shows the standard Traefik label pattern:
```yaml
labels:
- traefik.enable=true
- traefik.http.routers.whoami.rule=Host(`whoami.localhost`)
- traefik.http.routers.whoami.entrypoints=web
- traefik.http.services.whoami.loadbalancer.server.port=80
```
## Services
- **hello**: Simple hello-world container (exits immediately)
- **nginx**: Nginx web server on port 8080
| Service | Description | Ports |
|---------|-------------|-------|
| hello | Hello-world container (exits immediately) | - |
| nginx | Nginx web server | 8080 |
| traefik | Traefik reverse proxy with dashboard | 80, 8081 |
| whoami | Test service with Traefik labels | 18082 |
## Config
The `compose-farm.yaml` in this directory configures both services to run locally (no SSH).
The `compose-farm.yaml` in this directory configures all services to run locally (no SSH).
It also demonstrates the `traefik_file` option for auto-regenerating Traefik file-provider config.

View File

@@ -0,0 +1 @@
deployed: {}

View File

@@ -3,9 +3,15 @@
compose_dir: .
# Auto-regenerate Traefik file-provider config after up/down/restart/update
traefik_file: ./traefik/dynamic.d/compose-farm.yml
traefik_service: traefik # Skip services on same host (docker provider handles them)
hosts:
local: localhost
services:
hello: local
nginx: local
traefik: local
whoami: local

View File

@@ -0,0 +1,17 @@
services:
traefik:
image: traefik:v3.2
container_name: traefik
command:
- --api.insecure=true
- --providers.docker=true
- --providers.docker.exposedbydefault=false
- --providers.file.directory=/dynamic.d
- --providers.file.watch=true
- --entrypoints.web.address=:80
ports:
- "80:80"
- "8081:8080" # Traefik dashboard
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./dynamic.d:/dynamic.d

View File

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1,11 @@
services:
whoami:
image: traefik/whoami
container_name: whoami
ports:
- "18082:80"
labels:
- traefik.enable=true
- traefik.http.routers.whoami.rule=Host(`whoami.localhost`)
- traefik.http.routers.whoami.entrypoints=web
- traefik.http.services.whoami.loadbalancer.server.port=80

View File

@@ -12,10 +12,12 @@ dependencies = [
"pydantic>=2.0.0",
"asyncssh>=2.14.0",
"pyyaml>=6.0",
"rich>=13.0.0",
]
[project.scripts]
compose-farm = "compose_farm.cli:app"
cf = "compose_farm.cli:app"
[build-system]
requires = ["hatchling", "hatch-vcs"]

File diff suppressed because it is too large Load Diff

282
src/compose_farm/compose.py Normal file
View File

@@ -0,0 +1,282 @@
"""Compose file parsing utilities.
Handles .env loading, variable interpolation, port/volume/network extraction.
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import yaml
if TYPE_CHECKING:
from pathlib import Path
from .config import Config
# Port parsing constants
SINGLE_PART = 1
PUBLISHED_TARGET_PARTS = 2
HOST_PUBLISHED_PARTS = 3
MIN_VOLUME_PARTS = 2
_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-(.*?))?\}")
@dataclass(frozen=True)
class PortMapping:
"""Port mapping for a compose service."""
target: int
published: int | None
def load_env(compose_path: Path) -> dict[str, str]:
"""Load environment variables for compose interpolation.
Reads from .env file in the same directory as compose file,
then overlays current environment variables.
"""
env: dict[str, str] = {}
env_path = compose_path.parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip()
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
env[key] = value
env.update({k: v for k, v in os.environ.items() if isinstance(v, str)})
return env
def interpolate(value: str, env: dict[str, str]) -> str:
"""Perform ${VAR} and ${VAR:-default} interpolation."""
def replace(match: re.Match[str]) -> str:
var = match.group(1)
default = match.group(2)
resolved = env.get(var)
if resolved:
return resolved
return default or ""
return _VAR_PATTERN.sub(replace, value)
def parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PLR0912
"""Parse port specifications from compose file.
Handles string formats like "8080", "8080:80", "0.0.0.0:8080:80",
and dict formats with target/published keys.
"""
if raw is None:
return []
mappings: list[PortMapping] = []
items = raw if isinstance(raw, list) else [raw]
for item in items:
if isinstance(item, str):
interpolated = interpolate(item, env)
port_spec, _, _ = interpolated.partition("/")
parts = port_spec.split(":")
published: int | None = None
target: int | None = None
if len(parts) == SINGLE_PART and parts[0].isdigit():
target = int(parts[0])
elif len(parts) == PUBLISHED_TARGET_PARTS and parts[0].isdigit() and parts[1].isdigit():
published = int(parts[0])
target = int(parts[1])
elif len(parts) == HOST_PUBLISHED_PARTS and parts[-2].isdigit() and parts[-1].isdigit():
published = int(parts[-2])
target = int(parts[-1])
if target is not None:
mappings.append(PortMapping(target=target, published=published))
elif isinstance(item, dict):
target_raw = item.get("target")
if isinstance(target_raw, str):
target_raw = interpolate(target_raw, env)
if target_raw is None:
continue
try:
target_val = int(str(target_raw))
except (TypeError, ValueError):
continue
published_raw = item.get("published")
if isinstance(published_raw, str):
published_raw = interpolate(published_raw, env)
published_val: int | None
try:
published_val = int(str(published_raw)) if published_raw is not None else None
except (TypeError, ValueError):
published_val = None
mappings.append(PortMapping(target=target_val, published=published_val))
return mappings
def _resolve_host_path(host_path: str, compose_dir: Path) -> str | None:
"""Resolve a host path from volume mount, returning None for named volumes."""
if host_path.startswith("/"):
return host_path
if host_path.startswith(("./", "../")):
return str((compose_dir / host_path).resolve())
return None # Named volume
def _parse_volume_item(
item: str | dict[str, Any],
env: dict[str, str],
compose_dir: Path,
) -> str | None:
"""Parse a single volume item and return host path if it's a bind mount."""
if isinstance(item, str):
interpolated = interpolate(item, env)
parts = interpolated.split(":")
if len(parts) >= MIN_VOLUME_PARTS:
return _resolve_host_path(parts[0], compose_dir)
elif isinstance(item, dict) and item.get("type") == "bind":
source = item.get("source")
if source:
interpolated = interpolate(str(source), env)
return _resolve_host_path(interpolated, compose_dir)
return None
def parse_host_volumes(config: Config, service: str) -> list[str]:
"""Extract host bind mount paths from a service's compose file.
Returns a list of absolute host paths used as volume mounts.
Skips named volumes and resolves relative paths.
"""
compose_path = config.get_compose_path(service)
if not compose_path.exists():
return []
env = load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
return []
paths: list[str] = []
compose_dir = compose_path.parent
for definition in raw_services.values():
if not isinstance(definition, dict):
continue
volumes = definition.get("volumes")
if not volumes:
continue
items = volumes if isinstance(volumes, list) else [volumes]
for item in items:
host_path = _parse_volume_item(item, env, compose_dir)
if host_path:
paths.append(host_path)
# Return unique paths, preserving order
seen: set[str] = set()
unique: list[str] = []
for p in paths:
if p not in seen:
seen.add(p)
unique.append(p)
return unique
def parse_external_networks(config: Config, service: str) -> list[str]:
"""Extract external network names from a service's compose file.
Returns a list of network names marked as external: true.
"""
compose_path = config.get_compose_path(service)
if not compose_path.exists():
return []
compose_data = yaml.safe_load(compose_path.read_text()) or {}
networks = compose_data.get("networks", {})
if not isinstance(networks, dict):
return []
external_networks: list[str] = []
for name, definition in networks.items():
if isinstance(definition, dict) and definition.get("external") is True:
external_networks.append(name)
return external_networks
def load_compose_services(
config: Config,
stack: str,
) -> tuple[dict[str, Any], dict[str, str], str]:
"""Load services from a compose file with environment interpolation.
Returns (services_dict, env_dict, host_address).
"""
compose_path = config.get_compose_path(stack)
if not compose_path.exists():
message = f"[{stack}] Compose file not found: {compose_path}"
raise FileNotFoundError(message)
env = load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
return {}, env, config.get_host(stack).address
return raw_services, env, config.get_host(stack).address
def normalize_labels(raw: Any, env: dict[str, str]) -> dict[str, str]:
"""Normalize labels from list or dict format, with interpolation."""
if raw is None:
return {}
if isinstance(raw, dict):
return {
interpolate(str(k), env): interpolate(str(v), env)
for k, v in raw.items()
if k is not None
}
if isinstance(raw, list):
labels: dict[str, str] = {}
for item in raw:
if not isinstance(item, str) or "=" not in item:
continue
key_raw, value_raw = item.split("=", 1)
key = interpolate(key_raw.strip(), env)
value = interpolate(value_raw.strip(), env)
labels[key] = value
return labels
return {}
def get_ports_for_service(
definition: dict[str, Any],
all_services: dict[str, Any],
env: dict[str, str],
) -> list[PortMapping]:
"""Get ports for a service, following network_mode: service:X if present."""
network_mode = definition.get("network_mode", "")
if isinstance(network_mode, str) and network_mode.startswith("service:"):
# Service uses another service's network - get ports from that service
ref_service = network_mode[len("service:") :]
if ref_service in all_services:
ref_def = all_services[ref_service]
if isinstance(ref_def, dict):
return parse_ports(ref_def.get("ports"), env)
return parse_ports(definition.get("ports"), env)

View File

@@ -23,6 +23,13 @@ class Config(BaseModel):
compose_dir: Path = Path("/opt/compose")
hosts: dict[str, Host]
services: dict[str, str] # service_name -> host_name
traefik_file: Path | None = None # Auto-regenerate traefik config after up/down
traefik_service: str | None = None # Service name for Traefik (skip its host in file-provider)
config_path: Path = Path() # Set by load_config()
def get_state_path(self) -> Path:
"""Get the state file path (stored alongside config)."""
return self.config_path.parent / "compose-farm-state.yaml"
@model_validator(mode="after")
def validate_service_hosts(self) -> Config:
@@ -46,13 +53,37 @@ class Config(BaseModel):
Tries compose.yaml first, then docker-compose.yml.
"""
service_dir = self.compose_dir / service
for filename in ("compose.yaml", "compose.yml", "docker-compose.yml", "docker-compose.yaml"):
for filename in (
"compose.yaml",
"compose.yml",
"docker-compose.yml",
"docker-compose.yaml",
):
candidate = service_dir / filename
if candidate.exists():
return candidate
# Default to compose.yaml if none exist (will error later)
return service_dir / "compose.yaml"
def discover_compose_dirs(self) -> set[str]:
"""Find all directories in compose_dir that contain a compose file."""
compose_filenames = {
"compose.yaml",
"compose.yml",
"docker-compose.yml",
"docker-compose.yaml",
}
found: set[str] = set()
if not self.compose_dir.exists():
return found
for subdir in self.compose_dir.iterdir():
if subdir.is_dir():
for filename in compose_filenames:
if (subdir / filename).exists():
found.add(subdir.name)
break
return found
def _parse_hosts(raw_hosts: dict[str, str | dict[str, str | int]]) -> dict[str, Host]:
"""Parse hosts from config, handling both simple and full forms."""
@@ -98,5 +129,6 @@ def load_config(path: Path | None = None) -> Config:
# Parse hosts with flexible format support
raw["hosts"] = _parse_hosts(raw.get("hosts", {}))
raw["config_path"] = config_path.resolve()
return Config(**raw)

View File

@@ -0,0 +1,395 @@
"""Command execution via SSH or locally."""
from __future__ import annotations
import asyncio
import socket
import subprocess
from dataclasses import dataclass
from functools import lru_cache
from typing import TYPE_CHECKING, Any
import asyncssh
from rich.console import Console
from rich.markup import escape
if TYPE_CHECKING:
from .config import Config, Host
_console = Console(highlight=False)
_err_console = Console(stderr=True, highlight=False)
LOCAL_ADDRESSES = frozenset({"local", "localhost", "127.0.0.1", "::1"})
_DEFAULT_SSH_PORT = 22
@lru_cache(maxsize=1)
def _get_local_ips() -> frozenset[str]:
"""Get all IP addresses of the current machine."""
ips: set[str] = set()
try:
hostname = socket.gethostname()
# Get all addresses for hostname
for info in socket.getaddrinfo(hostname, None):
addr = info[4][0]
if isinstance(addr, str):
ips.add(addr)
# Also try getting the default outbound IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
ips.add(s.getsockname()[0])
except OSError:
pass
return frozenset(ips)
@dataclass
class CommandResult:
"""Result of a command execution."""
service: str
exit_code: int
success: bool
stdout: str = ""
stderr: str = ""
def _is_local(host: Host) -> bool:
"""Check if host should run locally (no SSH)."""
addr = host.address.lower()
if addr in LOCAL_ADDRESSES:
return True
# Check if address matches any of this machine's IPs
return addr in _get_local_ips()
async def _run_local_command(
command: str,
service: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a command locally with streaming output."""
try:
if raw:
# Run with inherited stdout/stderr for proper \r handling
proc = await asyncio.create_subprocess_shell(
command,
stdout=None, # Inherit
stderr=None, # Inherit
)
await proc.wait()
return CommandResult(
service=service,
exit_code=proc.returncode or 0,
success=proc.returncode == 0,
)
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
if stream and proc.stdout and proc.stderr:
async def read_stream(
reader: asyncio.StreamReader,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
console = _err_console if is_stderr else _console
while True:
line = await reader.readline()
if not line:
break
text = line.decode()
if text.strip(): # Skip empty lines
console.print(f"[cyan]\\[{prefix}][/] {escape(text)}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
read_stream(proc.stderr, service, is_stderr=True),
)
stdout_data = b""
stderr_data = b""
if not stream:
stdout_data, stderr_data = await proc.communicate()
else:
await proc.wait()
return CommandResult(
service=service,
exit_code=proc.returncode or 0,
success=proc.returncode == 0,
stdout=stdout_data.decode() if stdout_data else "",
stderr=stderr_data.decode() if stderr_data else "",
)
except OSError as e:
_err_console.print(f"[cyan]\\[{service}][/] [red]Local error:[/] {e}")
return CommandResult(service=service, exit_code=1, success=False)
async def _run_ssh_command(
host: Host,
command: str,
service: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a command on a remote host via SSH with streaming output."""
if raw:
# Use native ssh with TTY for proper progress bar rendering
ssh_args = ["ssh", "-t"]
if host.port != _DEFAULT_SSH_PORT:
ssh_args.extend(["-p", str(host.port)])
ssh_args.extend([f"{host.user}@{host.address}", command])
# Run in thread to avoid blocking the event loop
result = await asyncio.to_thread(subprocess.run, ssh_args, check=False)
return CommandResult(
service=service,
exit_code=result.returncode,
success=result.returncode == 0,
)
proc: asyncssh.SSHClientProcess[Any]
try:
async with asyncssh.connect( # noqa: SIM117 - conn needed before create_process
host.address,
port=host.port,
username=host.user,
known_hosts=None,
) as conn:
async with conn.create_process(command) as proc:
if stream:
async def read_stream(
reader: Any,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
console = _err_console if is_stderr else _console
async for line in reader:
if line.strip(): # Skip empty lines
console.print(f"[cyan]\\[{prefix}][/] {escape(line)}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
read_stream(proc.stderr, service, is_stderr=True),
)
stdout_data = ""
stderr_data = ""
if not stream:
stdout_data = await proc.stdout.read()
stderr_data = await proc.stderr.read()
await proc.wait()
return CommandResult(
service=service,
exit_code=proc.exit_status or 0,
success=proc.exit_status == 0,
stdout=stdout_data,
stderr=stderr_data,
)
except (OSError, asyncssh.Error) as e:
_err_console.print(f"[cyan]\\[{service}][/] [red]SSH error:[/] {e}")
return CommandResult(service=service, exit_code=1, success=False)
async def run_command(
host: Host,
command: str,
service: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a command on a host (locally or via SSH)."""
if _is_local(host):
return await _run_local_command(command, service, stream=stream, raw=raw)
return await _run_ssh_command(host, command, service, stream=stream, raw=raw)
async def run_compose(
config: Config,
service: str,
compose_cmd: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a docker compose command for a service."""
host = config.get_host(service)
compose_path = config.get_compose_path(service)
command = f"docker compose -f {compose_path} {compose_cmd}"
return await run_command(host, command, service, stream=stream, raw=raw)
async def run_compose_on_host(
config: Config,
service: str,
host_name: str,
compose_cmd: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a docker compose command for a service on a specific host.
Used for migration - running 'down' on the old host before 'up' on new host.
"""
host = config.hosts[host_name]
compose_path = config.get_compose_path(service)
command = f"docker compose -f {compose_path} {compose_cmd}"
return await run_command(host, command, service, stream=stream, raw=raw)
async def run_on_services(
config: Config,
services: list[str],
compose_cmd: str,
*,
stream: bool = True,
raw: bool = False,
) -> list[CommandResult]:
"""Run a docker compose command on multiple services in parallel.
Note: raw=True only makes sense for single-service operations.
"""
tasks = [
run_compose(config, service, compose_cmd, stream=stream, raw=raw) for service in services
]
return await asyncio.gather(*tasks)
async def run_sequential_commands(
config: Config,
service: str,
commands: list[str],
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run multiple compose commands sequentially for a service."""
for cmd in commands:
result = await run_compose(config, service, cmd, stream=stream, raw=raw)
if not result.success:
return result
return CommandResult(service=service, exit_code=0, success=True)
async def run_sequential_on_services(
config: Config,
services: list[str],
commands: list[str],
*,
stream: bool = True,
raw: bool = False,
) -> list[CommandResult]:
"""Run sequential commands on multiple services in parallel.
Note: raw=True only makes sense for single-service operations.
"""
tasks = [
run_sequential_commands(config, service, commands, stream=stream, raw=raw)
for service in services
]
return await asyncio.gather(*tasks)
async def check_service_running(
config: Config,
service: str,
host_name: str,
) -> bool:
"""Check if a service has running containers on a specific host."""
host = config.hosts[host_name]
compose_path = config.get_compose_path(service)
# Use ps --status running to check for running containers
command = f"docker compose -f {compose_path} ps --status running -q"
result = await run_command(host, command, service, stream=False)
# If command succeeded and has output, containers are running
return result.success and bool(result.stdout.strip())
async def check_paths_exist(
config: Config,
host_name: str,
paths: list[str],
) -> dict[str, bool]:
"""Check if multiple paths exist on a specific host.
Returns a dict mapping path -> exists.
"""
if not paths:
return {}
host = config.hosts[host_name]
# Build a command that checks all paths efficiently
# Using a subshell to check each path and report Y/N
checks = []
for p in paths:
# Escape single quotes in path
escaped = p.replace("'", "'\\''")
checks.append(f"test -e '{escaped}' && echo 'Y:{escaped}' || echo 'N:{escaped}'")
command = "; ".join(checks)
result = await run_command(host, command, "mount-check", stream=False)
exists: dict[str, bool] = dict.fromkeys(paths, False)
for raw_line in result.stdout.splitlines():
line = raw_line.strip()
if line.startswith("Y:"):
exists[line[2:]] = True
elif line.startswith("N:"):
exists[line[2:]] = False
return exists
async def check_networks_exist(
config: Config,
host_name: str,
networks: list[str],
) -> dict[str, bool]:
"""Check if Docker networks exist on a specific host.
Returns a dict mapping network_name -> exists.
"""
if not networks:
return {}
host = config.hosts[host_name]
# Check each network via docker network inspect
checks = []
for net in networks:
escaped = net.replace("'", "'\\''")
checks.append(
f"docker network inspect '{escaped}' >/dev/null 2>&1 "
f"&& echo 'Y:{escaped}' || echo 'N:{escaped}'"
)
command = "; ".join(checks)
result = await run_command(host, command, "network-check", stream=False)
exists: dict[str, bool] = dict.fromkeys(networks, False)
for raw_line in result.stdout.splitlines():
line = raw_line.strip()
if line.startswith("Y:"):
exists[line[2:]] = True
elif line.startswith("N:"):
exists[line[2:]] = False
return exists

View File

@@ -9,13 +9,13 @@ from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
from .ssh import run_compose
from .executor import run_compose
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable
from .config import Config
from .ssh import CommandResult
from .executor import CommandResult
DEFAULT_LOG_PATH = Path.home() / ".config" / "compose-farm" / "dockerfarm-log.toml"

View File

@@ -0,0 +1,234 @@
"""High-level operations for compose-farm.
Contains the business logic for up, down, sync, check, and migration operations.
CLI commands are thin wrappers around these functions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from rich.console import Console
from .compose import parse_external_networks, parse_host_volumes
from .executor import (
CommandResult,
check_networks_exist,
check_paths_exist,
check_service_running,
run_compose,
run_compose_on_host,
)
from .state import get_service_host, set_service_host
if TYPE_CHECKING:
from .config import Config
console = Console(highlight=False)
err_console = Console(stderr=True, highlight=False)
def get_service_paths(cfg: Config, service: str) -> list[str]:
"""Get all required paths for a service (compose_dir + volumes)."""
paths = [str(cfg.compose_dir)]
paths.extend(parse_host_volumes(cfg, service))
return paths
async def check_mounts_for_migration(
cfg: Config,
service: str,
target_host: str,
) -> list[str]:
"""Check if mount paths exist on target host. Returns list of missing paths."""
paths = get_service_paths(cfg, service)
exists = await check_paths_exist(cfg, target_host, paths)
return [p for p, found in exists.items() if not found]
async def check_networks_for_migration(
cfg: Config,
service: str,
target_host: str,
) -> list[str]:
"""Check if Docker networks exist on target host. Returns list of missing networks."""
networks = parse_external_networks(cfg, service)
if not networks:
return []
exists = await check_networks_exist(cfg, target_host, networks)
return [n for n, found in exists.items() if not found]
async def preflight_check(
cfg: Config,
service: str,
target_host: str,
) -> tuple[list[str], list[str]]:
"""Run pre-flight checks for a service on target host.
Returns (missing_paths, missing_networks).
"""
missing_paths = await check_mounts_for_migration(cfg, service, target_host)
missing_networks = await check_networks_for_migration(cfg, service, target_host)
return missing_paths, missing_networks
def report_preflight_failures(
service: str,
target_host: str,
missing_paths: list[str],
missing_networks: list[str],
) -> None:
"""Report pre-flight check failures."""
err_console.print(
f"[cyan]\\[{service}][/] [red]✗[/] Cannot start on [magenta]{target_host}[/]:"
)
for path in missing_paths:
err_console.print(f" [red]✗[/] missing path: {path}")
for net in missing_networks:
err_console.print(f" [red]✗[/] missing network: {net}")
async def up_services(
cfg: Config,
services: list[str],
*,
raw: bool = False,
) -> list[CommandResult]:
"""Start services with automatic migration if host changed."""
results: list[CommandResult] = []
total = len(services)
for idx, service in enumerate(services, 1):
prefix = f"[dim][{idx}/{total}][/] [cyan]\\[{service}][/]"
target_host = cfg.services[service]
current_host = get_service_host(cfg, service)
# Pre-flight check: verify paths and networks exist on target
missing_paths, missing_networks = await preflight_check(cfg, service, target_host)
if missing_paths or missing_networks:
report_preflight_failures(service, target_host, missing_paths, missing_networks)
results.append(CommandResult(service=service, exit_code=1, success=False))
continue
# If service is deployed elsewhere, migrate it
if current_host and current_host != target_host:
if current_host in cfg.hosts:
console.print(
f"{prefix} Migrating from "
f"[magenta]{current_host}[/] → [magenta]{target_host}[/]..."
)
down_result = await run_compose_on_host(cfg, service, current_host, "down", raw=raw)
if raw:
print() # Ensure newline after raw output
if not down_result.success:
results.append(down_result)
continue
else:
err_console.print(
f"{prefix} [yellow]![/] was on "
f"[magenta]{current_host}[/] (not in config), skipping down"
)
# Start on target host
console.print(f"{prefix} Starting on [magenta]{target_host}[/]...")
up_result = await run_compose(cfg, service, "up -d", raw=raw)
if raw:
print() # Ensure newline after raw output (progress bars end with \r)
results.append(up_result)
# Update state on success
if up_result.success:
set_service_host(cfg, service, target_host)
return results
async def discover_running_services(cfg: Config) -> dict[str, str]:
"""Discover which services are running on which hosts.
Returns a dict mapping service names to host names for running services.
"""
discovered: dict[str, str] = {}
for service, assigned_host in cfg.services.items():
# Check assigned host first (most common case)
if await check_service_running(cfg, service, assigned_host):
discovered[service] = assigned_host
continue
# Check other hosts in case service was migrated but state is stale
for host_name in cfg.hosts:
if host_name == assigned_host:
continue
if await check_service_running(cfg, service, host_name):
discovered[service] = host_name
break
return discovered
async def check_host_compatibility(
cfg: Config,
service: str,
) -> dict[str, tuple[int, int, list[str]]]:
"""Check which hosts can run a service based on mount paths.
Returns dict of host_name -> (found_count, total_count, missing_paths).
"""
paths = get_service_paths(cfg, service)
results: dict[str, tuple[int, int, list[str]]] = {}
for host_name in cfg.hosts:
exists = await check_paths_exist(cfg, host_name, paths)
found = sum(1 for v in exists.values() if v)
missing = [p for p, v in exists.items() if not v]
results[host_name] = (found, len(paths), missing)
return results
async def check_mounts_on_configured_hosts(
cfg: Config,
services: list[str],
) -> list[tuple[str, str, str]]:
"""Check mount paths exist on configured hosts.
Returns list of (service, host, missing_path) tuples.
"""
missing: list[tuple[str, str, str]] = []
for service in services:
host_name = cfg.services[service]
paths = get_service_paths(cfg, service)
exists = await check_paths_exist(cfg, host_name, paths)
for path, found in exists.items():
if not found:
missing.append((service, host_name, path))
return missing
async def check_networks_on_configured_hosts(
cfg: Config,
services: list[str],
) -> list[tuple[str, str, str]]:
"""Check Docker networks exist on configured hosts.
Returns list of (service, host, missing_network) tuples.
"""
missing: list[tuple[str, str, str]] = []
for service in services:
host_name = cfg.services[service]
networks = parse_external_networks(cfg, service)
if not networks:
continue
exists = await check_networks_exist(cfg, host_name, networks)
for net, found in exists.items():
if not found:
missing.append((service, host_name, net))
return missing

View File

@@ -1,208 +0,0 @@
"""Command execution via SSH or locally."""
from __future__ import annotations
import asyncio
import sys
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import asyncssh
if TYPE_CHECKING:
from .config import Config, Host
LOCAL_ADDRESSES = frozenset({"local", "localhost", "127.0.0.1", "::1"})
@dataclass
class CommandResult:
"""Result of a command execution."""
service: str
exit_code: int
success: bool
stdout: str = ""
stderr: str = ""
def _is_local(host: Host) -> bool:
"""Check if host should run locally (no SSH)."""
return host.address.lower() in LOCAL_ADDRESSES
async def _run_local_command(
command: str,
service: str,
*,
stream: bool = True,
) -> CommandResult:
"""Run a command locally with streaming output."""
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
if stream and proc.stdout and proc.stderr:
async def read_stream(
reader: asyncio.StreamReader,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
output = sys.stderr if is_stderr else sys.stdout
while True:
line = await reader.readline()
if not line:
break
print(f"[{prefix}] {line.decode()}", end="", file=output, flush=True)
await asyncio.gather(
read_stream(proc.stdout, service),
read_stream(proc.stderr, service, is_stderr=True),
)
stdout_data = b""
stderr_data = b""
if not stream:
stdout_data, stderr_data = await proc.communicate()
else:
await proc.wait()
return CommandResult(
service=service,
exit_code=proc.returncode or 0,
success=proc.returncode == 0,
stdout=stdout_data.decode() if stdout_data else "",
stderr=stderr_data.decode() if stderr_data else "",
)
except OSError as e:
print(f"[{service}] Local error: {e}", file=sys.stderr)
return CommandResult(service=service, exit_code=1, success=False)
async def _run_ssh_command(
host: Host,
command: str,
service: str,
*,
stream: bool = True,
) -> CommandResult:
"""Run a command on a remote host via SSH with streaming output."""
proc: asyncssh.SSHClientProcess[Any]
try:
async with (
asyncssh.connect(
host.address,
port=host.port,
username=host.user,
known_hosts=None,
) as conn,
conn.create_process(command) as proc,
):
if stream:
async def read_stream(
reader: Any,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
output = sys.stderr if is_stderr else sys.stdout
async for line in reader:
print(f"[{prefix}] {line}", end="", file=output, flush=True)
await asyncio.gather(
read_stream(proc.stdout, service),
read_stream(proc.stderr, service, is_stderr=True),
)
stdout_data = ""
stderr_data = ""
if not stream:
stdout_data = await proc.stdout.read()
stderr_data = await proc.stderr.read()
await proc.wait()
return CommandResult(
service=service,
exit_code=proc.exit_status or 0,
success=proc.exit_status == 0,
stdout=stdout_data,
stderr=stderr_data,
)
except (OSError, asyncssh.Error) as e:
print(f"[{service}] SSH error: {e}", file=sys.stderr)
return CommandResult(service=service, exit_code=1, success=False)
async def run_command(
host: Host,
command: str,
service: str,
*,
stream: bool = True,
) -> CommandResult:
"""Run a command on a host (locally or via SSH)."""
if _is_local(host):
return await _run_local_command(command, service, stream=stream)
return await _run_ssh_command(host, command, service, stream=stream)
async def run_compose(
config: Config,
service: str,
compose_cmd: str,
*,
stream: bool = True,
) -> CommandResult:
"""Run a docker compose command for a service."""
host = config.get_host(service)
compose_path = config.get_compose_path(service)
command = f"docker compose -f {compose_path} {compose_cmd}"
return await run_command(host, command, service, stream=stream)
async def run_on_services(
config: Config,
services: list[str],
compose_cmd: str,
*,
stream: bool = True,
) -> list[CommandResult]:
"""Run a docker compose command on multiple services in parallel."""
tasks = [run_compose(config, service, compose_cmd, stream=stream) for service in services]
return await asyncio.gather(*tasks)
async def run_sequential_commands(
config: Config,
service: str,
commands: list[str],
*,
stream: bool = True,
) -> CommandResult:
"""Run multiple compose commands sequentially for a service."""
for cmd in commands:
result = await run_compose(config, service, cmd, stream=stream)
if not result.success:
return result
return CommandResult(service=service, exit_code=0, success=True)
async def run_sequential_on_services(
config: Config,
services: list[str],
commands: list[str],
*,
stream: bool = True,
) -> list[CommandResult]:
"""Run sequential commands on multiple services in parallel."""
tasks = [
run_sequential_commands(config, service, commands, stream=stream) for service in services
]
return await asyncio.gather(*tasks)

69
src/compose_farm/state.py Normal file
View File

@@ -0,0 +1,69 @@
"""State tracking for deployed services."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import yaml
if TYPE_CHECKING:
from .config import Config
def load_state(config: Config) -> dict[str, str]:
"""Load the current deployment state.
Returns a dict mapping service names to host names.
"""
state_path = config.get_state_path()
if not state_path.exists():
return {}
with state_path.open() as f:
data: dict[str, Any] = yaml.safe_load(f) or {}
deployed: dict[str, str] = data.get("deployed", {})
return deployed
def _sorted_dict(d: dict[str, str]) -> dict[str, str]:
"""Return a dictionary sorted by keys."""
return dict(sorted(d.items(), key=lambda item: item[0]))
def save_state(config: Config, deployed: dict[str, str]) -> None:
"""Save the deployment state."""
state_path = config.get_state_path()
with state_path.open("w") as f:
yaml.safe_dump({"deployed": _sorted_dict(deployed)}, f, sort_keys=False)
def get_service_host(config: Config, service: str) -> str | None:
"""Get the host where a service is currently deployed."""
state = load_state(config)
return state.get(service)
def set_service_host(config: Config, service: str, host: str) -> None:
"""Record that a service is deployed on a host."""
state = load_state(config)
state[service] = host
save_state(config, state)
def remove_service(config: Config, service: str) -> None:
"""Remove a service from the state (after down)."""
state = load_state(config)
state.pop(service, None)
save_state(config, state)
def get_services_needing_migration(config: Config) -> list[str]:
"""Get services where current host differs from configured host."""
state = load_state(config)
needs_migration = []
for service, configured_host in config.services.items():
current_host = state.get(service)
if current_host and current_host != configured_host:
needs_migration.append(service)
return needs_migration

View File

@@ -8,28 +8,21 @@ use host-published ports for cross-host reachability.
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import yaml
from .compose import (
PortMapping,
get_ports_for_service,
load_compose_services,
normalize_labels,
)
from .executor import LOCAL_ADDRESSES
if TYPE_CHECKING:
from pathlib import Path
from .config import Config
@dataclass(frozen=True)
class PortMapping:
"""Port mapping for a compose service."""
target: int
published: int | None
protocol: str | None = None
@dataclass
class TraefikServiceSource:
"""Source information to build an upstream for a Traefik service."""
@@ -44,128 +37,8 @@ class TraefikServiceSource:
LIST_VALUE_KEYS = {"entrypoints", "middlewares"}
SINGLE_PART = 1
PUBLISHED_TARGET_PARTS = 2
HOST_PUBLISHED_PARTS = 3
MIN_ROUTER_PARTS = 3
MIN_SERVICE_LABEL_PARTS = 6
_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-(.*?))?\}")
def _load_env(compose_path: Path) -> dict[str, str]:
"""Load environment variables for compose interpolation."""
env: dict[str, str] = {}
env_path = compose_path.parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip()
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
env[key] = value
env.update({k: v for k, v in os.environ.items() if isinstance(v, str)})
return env
def _interpolate(value: str, env: dict[str, str]) -> str:
"""Perform a minimal `${VAR}`/`${VAR:-default}` interpolation."""
def replace(match: re.Match[str]) -> str:
var = match.group(1)
default = match.group(2)
resolved = env.get(var)
if resolved:
return resolved
return default or ""
return _VAR_PATTERN.sub(replace, value)
def _normalize_labels(raw: Any, env: dict[str, str]) -> dict[str, str]:
if raw is None:
return {}
if isinstance(raw, dict):
return {
_interpolate(str(k), env): _interpolate(str(v), env)
for k, v in raw.items()
if k is not None
}
if isinstance(raw, list):
labels: dict[str, str] = {}
for item in raw:
if not isinstance(item, str) or "=" not in item:
continue
key_raw, value_raw = item.split("=", 1)
key = _interpolate(key_raw.strip(), env)
value = _interpolate(value_raw.strip(), env)
labels[key] = value
return labels
return {}
def _parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PLR0912
if raw is None:
return []
mappings: list[PortMapping] = []
items = raw if isinstance(raw, list) else [raw]
for item in items:
if isinstance(item, str):
interpolated = _interpolate(item, env)
port_spec, _, protocol = interpolated.partition("/")
parts = port_spec.split(":")
published: int | None = None
target: int | None = None
if len(parts) == SINGLE_PART and parts[0].isdigit():
target = int(parts[0])
elif len(parts) == PUBLISHED_TARGET_PARTS and parts[0].isdigit() and parts[1].isdigit():
published = int(parts[0])
target = int(parts[1])
elif len(parts) == HOST_PUBLISHED_PARTS and parts[-2].isdigit() and parts[-1].isdigit():
published = int(parts[-2])
target = int(parts[-1])
if target is not None:
mappings.append(
PortMapping(target=target, published=published, protocol=protocol or None)
)
elif isinstance(item, dict):
target_raw = item.get("target")
if isinstance(target_raw, str):
target_raw = _interpolate(target_raw, env)
if target_raw is None:
continue
try:
target_val = int(str(target_raw))
except (TypeError, ValueError):
continue
published_raw = item.get("published")
if isinstance(published_raw, str):
published_raw = _interpolate(published_raw, env)
published_val: int | None
try:
published_val = int(str(published_raw)) if published_raw is not None else None
except (TypeError, ValueError):
published_val = None
protocol_val = item.get("protocol")
mappings.append(
PortMapping(
target=target_val,
published=published_val,
protocol=str(protocol_val) if protocol_val else None,
)
)
return mappings
def _parse_value(key: str, raw_value: str) -> Any:
@@ -263,20 +136,6 @@ def _resolve_published_port(source: TraefikServiceSource) -> tuple[int | None, s
)
def _load_stack(config: Config, stack: str) -> tuple[dict[str, Any], dict[str, str], str]:
compose_path = config.get_compose_path(stack)
if not compose_path.exists():
message = f"[{stack}] Compose file not found: {compose_path}"
raise FileNotFoundError(message)
env = _load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
return {}, env, config.get_host(stack).address
return raw_services, env, config.get_host(stack).address
def _finalize_http_services(
dynamic: dict[str, Any],
sources: dict[str, TraefikServiceSource],
@@ -289,8 +148,8 @@ def _finalize_http_services(
if published_port is None:
warnings.append(
f"[{source.stack}/{source.compose_service}] "
f"No host-published port found for Traefik service '{traefik_service}'. "
"Traefik will require L3 reachability to container IPs."
f"No published port found for Traefik service '{traefik_service}'. "
"Add a ports: mapping (e.g., '8080:8080') for cross-host routing."
)
continue
@@ -402,20 +261,21 @@ def _process_service_labels(
stack: str,
compose_service: str,
definition: dict[str, Any],
all_services: dict[str, Any],
host_address: str,
env: dict[str, str],
dynamic: dict[str, Any],
sources: dict[str, TraefikServiceSource],
warnings: list[str],
) -> None:
labels = _normalize_labels(definition.get("labels"), env)
labels = normalize_labels(definition.get("labels"), env)
if not labels:
return
enable_raw = labels.get("traefik.enable")
if enable_raw is not None and _parse_value("enable", enable_raw) is False:
return
ports = _parse_ports(definition.get("ports"), env)
ports = get_ports_for_service(definition, all_services, env)
routers: dict[str, bool] = {}
service_names: set[str] = set()
@@ -450,17 +310,41 @@ def _process_service_labels(
def generate_traefik_config(
config: Config,
services: list[str],
*,
check_all: bool = False,
) -> tuple[dict[str, Any], list[str]]:
"""Generate Traefik dynamic config from compose labels.
Args:
config: The compose-farm config.
services: List of service names to process.
check_all: If True, check all services for warnings (ignore host filtering).
Used by the check command to validate all traefik labels.
Returns (config_dict, warnings).
"""
dynamic: dict[str, Any] = {}
warnings: list[str] = []
sources: dict[str, TraefikServiceSource] = {}
# Determine Traefik's host from service assignment
traefik_host = None
if config.traefik_service and not check_all:
traefik_host = config.services.get(config.traefik_service)
for stack in services:
raw_services, env, host_address = _load_stack(config, stack)
raw_services, env, host_address = load_compose_services(config, stack)
stack_host = config.services.get(stack)
# Skip services on Traefik's host - docker provider handles them directly
# (unless check_all is True, for validation purposes)
if not check_all:
if host_address.lower() in LOCAL_ADDRESSES:
continue
if traefik_host and stack_host == traefik_host:
continue
for compose_service, definition in raw_services.items():
if not isinstance(definition, dict):
continue
@@ -468,6 +352,7 @@ def generate_traefik_config(
stack,
compose_service,
definition,
raw_services,
host_address,
env,
dynamic,

207
tests/test_cli_logs.py Normal file
View File

@@ -0,0 +1,207 @@
"""Tests for CLI logs command."""
from collections.abc import Coroutine
from pathlib import Path
from typing import Any
from unittest.mock import patch
from compose_farm.cli import logs
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
def _make_config(tmp_path: Path) -> Config:
"""Create a minimal config for testing."""
compose_dir = tmp_path / "compose"
compose_dir.mkdir()
for svc in ("svc1", "svc2", "svc3"):
svc_dir = compose_dir / svc
svc_dir.mkdir()
(svc_dir / "docker-compose.yml").write_text("services: {}\n")
return Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost"), "remote": Host(address="192.168.1.10")},
services={"svc1": "local", "svc2": "local", "svc3": "remote"},
)
def _make_result(service: str) -> CommandResult:
"""Create a successful command result."""
return CommandResult(service=service, exit_code=0, success=True, stdout="", stderr="")
def _mock_run_async_factory(
services: list[str],
) -> tuple[Any, list[CommandResult]]:
"""Create a mock _run_async that returns results for given services."""
results = [_make_result(s) for s in services]
def mock_run_async(_coro: Coroutine[Any, Any, Any]) -> list[CommandResult]:
return results
return mock_run_async, results
class TestLogsContextualDefault:
"""Tests for logs --tail contextual default behavior."""
def test_logs_all_services_defaults_to_20(self, tmp_path: Path) -> None:
"""When --all is specified, default tail should be 20."""
cfg = _make_config(tmp_path)
mock_run_async, _ = _mock_run_async_factory(["svc1", "svc2", "svc3"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
):
mock_run.return_value = None
logs(services=None, all_services=True, host=None, follow=False, tail=None, config=None)
mock_run.assert_called_once()
call_args = mock_run.call_args
assert call_args[0][2] == "logs --tail 20"
def test_logs_single_service_defaults_to_100(self, tmp_path: Path) -> None:
"""When specific services are specified, default tail should be 100."""
cfg = _make_config(tmp_path)
mock_run_async, _ = _mock_run_async_factory(["svc1"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
):
logs(
services=["svc1"],
all_services=False,
host=None,
follow=False,
tail=None,
config=None,
)
mock_run.assert_called_once()
call_args = mock_run.call_args
assert call_args[0][2] == "logs --tail 100"
def test_logs_explicit_tail_overrides_default(self, tmp_path: Path) -> None:
"""When --tail is explicitly provided, it should override the default."""
cfg = _make_config(tmp_path)
mock_run_async, _ = _mock_run_async_factory(["svc1", "svc2", "svc3"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
):
logs(
services=None,
all_services=True,
host=None,
follow=False,
tail=50,
config=None,
)
mock_run.assert_called_once()
call_args = mock_run.call_args
assert call_args[0][2] == "logs --tail 50"
def test_logs_follow_appends_flag(self, tmp_path: Path) -> None:
"""When --follow is specified, -f should be appended to command."""
cfg = _make_config(tmp_path)
mock_run_async, _ = _mock_run_async_factory(["svc1"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
):
logs(
services=["svc1"],
all_services=False,
host=None,
follow=True,
tail=None,
config=None,
)
mock_run.assert_called_once()
call_args = mock_run.call_args
assert call_args[0][2] == "logs --tail 100 -f"
class TestLogsHostFilter:
"""Tests for logs --host filter behavior."""
def test_logs_host_filter_selects_services_on_host(self, tmp_path: Path) -> None:
"""When --host is specified, only services on that host are included."""
cfg = _make_config(tmp_path)
mock_run_async, _ = _mock_run_async_factory(["svc1", "svc2"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
):
logs(
services=None,
all_services=False,
host="local",
follow=False,
tail=None,
config=None,
)
mock_run.assert_called_once()
call_args = mock_run.call_args
# svc1 and svc2 are on "local", svc3 is on "remote"
assert set(call_args[0][1]) == {"svc1", "svc2"}
def test_logs_host_filter_defaults_to_20_lines(self, tmp_path: Path) -> None:
"""When --host is specified, default tail should be 20 (multiple services)."""
cfg = _make_config(tmp_path)
mock_run_async, _ = _mock_run_async_factory(["svc1", "svc2"])
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
patch("compose_farm.cli._run_async", side_effect=mock_run_async),
patch("compose_farm.cli.run_on_services") as mock_run,
):
logs(
services=None,
all_services=False,
host="local",
follow=False,
tail=None,
config=None,
)
mock_run.assert_called_once()
call_args = mock_run.call_args
assert call_args[0][2] == "logs --tail 20"
def test_logs_all_and_host_mutually_exclusive(self, tmp_path: Path) -> None:
"""Using --all and --host together should error."""
import pytest
import typer
cfg = _make_config(tmp_path)
with (
patch("compose_farm.cli._load_config_or_exit", return_value=cfg),
pytest.raises(typer.Exit) as exc_info,
):
logs(
services=None,
all_services=True,
host="local",
follow=False,
tail=None,
config=None,
)
assert exc_info.value.exit_code == 1

View File

@@ -75,7 +75,8 @@ class TestConfig:
services={"plex": "nas01"},
)
path = config.get_compose_path("plex")
assert path == Path("/opt/compose/plex/docker-compose.yml")
# Defaults to compose.yaml when no file exists
assert path == Path("/opt/compose/plex/compose.yaml")
class TestLoadConfig:

241
tests/test_executor.py Normal file
View File

@@ -0,0 +1,241 @@
"""Tests for executor module."""
import sys
from pathlib import Path
import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import (
CommandResult,
_is_local,
_run_local_command,
check_networks_exist,
check_paths_exist,
run_command,
run_compose,
run_on_services,
)
# These tests run actual shell commands that only work on Linux
linux_only = pytest.mark.skipif(sys.platform != "linux", reason="Linux-only shell commands")
class TestIsLocal:
"""Tests for _is_local function."""
@pytest.mark.parametrize(
"address",
["local", "localhost", "127.0.0.1", "::1", "LOCAL", "LOCALHOST"],
)
def test_local_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is True
@pytest.mark.parametrize(
"address",
["192.168.1.10", "nas01.local", "10.0.0.1", "example.com"],
)
def test_remote_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is False
class TestRunLocalCommand:
"""Tests for local command execution."""
async def test_run_local_command_success(self) -> None:
result = await _run_local_command("echo hello", "test-service")
assert result.success is True
assert result.exit_code == 0
assert result.service == "test-service"
async def test_run_local_command_failure(self) -> None:
result = await _run_local_command("exit 1", "test-service")
assert result.success is False
assert result.exit_code == 1
async def test_run_local_command_not_found(self) -> None:
result = await _run_local_command("nonexistent_command_xyz", "test-service")
assert result.success is False
assert result.exit_code != 0
async def test_run_local_command_captures_output(self) -> None:
result = await _run_local_command("echo hello", "test-service", stream=False)
assert "hello" in result.stdout
class TestRunCommand:
"""Tests for run_command dispatcher."""
async def test_run_command_local(self) -> None:
host = Host(address="localhost")
result = await run_command(host, "echo test", "test-service")
assert result.success is True
async def test_run_command_result_structure(self) -> None:
host = Host(address="local")
result = await run_command(host, "true", "my-service")
assert isinstance(result, CommandResult)
assert result.service == "my-service"
assert result.exit_code == 0
assert result.success is True
class TestRunCompose:
"""Tests for compose command execution."""
async def test_run_compose_builds_correct_command(self, tmp_path: Path) -> None:
# Create a minimal compose file
compose_dir = tmp_path / "compose"
service_dir = compose_dir / "test-service"
service_dir.mkdir(parents=True)
compose_file = service_dir / "docker-compose.yml"
compose_file.write_text("services: {}")
config = Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
services={"test-service": "local"},
)
# This will fail because docker compose isn't running,
# but we can verify the command structure works
result = await run_compose(config, "test-service", "config", stream=False)
# Command may fail due to no docker, but structure is correct
assert result.service == "test-service"
class TestRunOnServices:
"""Tests for parallel service execution."""
async def test_run_on_services_parallel(self) -> None:
config = Config(
compose_dir=Path("/tmp"),
hosts={"local": Host(address="localhost")},
services={"svc1": "local", "svc2": "local"},
)
# Use a simple command that will work without docker
# We'll test the parallelism structure
results = await run_on_services(config, ["svc1", "svc2"], "version", stream=False)
assert len(results) == 2
assert results[0].service == "svc1"
assert results[1].service == "svc2"
@linux_only
class TestCheckPathsExist:
"""Tests for check_paths_exist function (uses 'test -e' shell command)."""
async def test_check_existing_paths(self, tmp_path: Path) -> None:
"""Check paths that exist."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
# Create test paths
(tmp_path / "dir1").mkdir()
(tmp_path / "file1").touch()
result = await check_paths_exist(
config, "local", [str(tmp_path / "dir1"), str(tmp_path / "file1")]
)
assert result[str(tmp_path / "dir1")] is True
assert result[str(tmp_path / "file1")] is True
async def test_check_missing_paths(self, tmp_path: Path) -> None:
"""Check paths that don't exist."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_paths_exist(
config, "local", [str(tmp_path / "missing1"), str(tmp_path / "missing2")]
)
assert result[str(tmp_path / "missing1")] is False
assert result[str(tmp_path / "missing2")] is False
async def test_check_mixed_paths(self, tmp_path: Path) -> None:
"""Check mix of existing and missing paths."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
(tmp_path / "exists").mkdir()
result = await check_paths_exist(
config, "local", [str(tmp_path / "exists"), str(tmp_path / "missing")]
)
assert result[str(tmp_path / "exists")] is True
assert result[str(tmp_path / "missing")] is False
async def test_check_empty_paths(self, tmp_path: Path) -> None:
"""Empty path list returns empty dict."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_paths_exist(config, "local", [])
assert result == {}
@linux_only
class TestCheckNetworksExist:
"""Tests for check_networks_exist function (requires Docker)."""
async def test_check_bridge_network_exists(self, tmp_path: Path) -> None:
"""The 'bridge' network always exists on Docker hosts."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_networks_exist(config, "local", ["bridge"])
assert result["bridge"] is True
async def test_check_nonexistent_network(self, tmp_path: Path) -> None:
"""Check a network that doesn't exist."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_networks_exist(config, "local", ["nonexistent_network_xyz_123"])
assert result["nonexistent_network_xyz_123"] is False
async def test_check_mixed_networks(self, tmp_path: Path) -> None:
"""Check mix of existing and non-existing networks."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_networks_exist(
config, "local", ["bridge", "nonexistent_network_xyz_123"]
)
assert result["bridge"] is True
assert result["nonexistent_network_xyz_123"] is False
async def test_check_empty_networks(self, tmp_path: Path) -> None:
"""Empty network list returns empty dict."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_networks_exist(config, "local", [])
assert result == {}

View File

@@ -8,8 +8,8 @@ from pathlib import Path
import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.logs import _parse_images_output, snapshot_services
from compose_farm.ssh import CommandResult
def test_parse_images_output_handles_list_and_lines() -> None:

View File

@@ -1,118 +0,0 @@
"""Tests for ssh module."""
from pathlib import Path
import pytest
from compose_farm.config import Config, Host
from compose_farm.ssh import (
CommandResult,
_is_local,
_run_local_command,
run_command,
run_compose,
run_on_services,
)
class TestIsLocal:
"""Tests for _is_local function."""
@pytest.mark.parametrize(
"address",
["local", "localhost", "127.0.0.1", "::1", "LOCAL", "LOCALHOST"],
)
def test_local_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is True
@pytest.mark.parametrize(
"address",
["192.168.1.10", "nas01.local", "10.0.0.1", "example.com"],
)
def test_remote_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is False
class TestRunLocalCommand:
"""Tests for local command execution."""
async def test_run_local_command_success(self) -> None:
result = await _run_local_command("echo hello", "test-service")
assert result.success is True
assert result.exit_code == 0
assert result.service == "test-service"
async def test_run_local_command_failure(self) -> None:
result = await _run_local_command("exit 1", "test-service")
assert result.success is False
assert result.exit_code == 1
async def test_run_local_command_not_found(self) -> None:
result = await _run_local_command("nonexistent_command_xyz", "test-service")
assert result.success is False
assert result.exit_code != 0
async def test_run_local_command_captures_output(self) -> None:
result = await _run_local_command("echo hello", "test-service", stream=False)
assert "hello" in result.stdout
class TestRunCommand:
"""Tests for run_command dispatcher."""
async def test_run_command_local(self) -> None:
host = Host(address="localhost")
result = await run_command(host, "echo test", "test-service")
assert result.success is True
async def test_run_command_result_structure(self) -> None:
host = Host(address="local")
result = await run_command(host, "true", "my-service")
assert isinstance(result, CommandResult)
assert result.service == "my-service"
assert result.exit_code == 0
assert result.success is True
class TestRunCompose:
"""Tests for compose command execution."""
async def test_run_compose_builds_correct_command(self, tmp_path: Path) -> None:
# Create a minimal compose file
compose_dir = tmp_path / "compose"
service_dir = compose_dir / "test-service"
service_dir.mkdir(parents=True)
compose_file = service_dir / "docker-compose.yml"
compose_file.write_text("services: {}")
config = Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
services={"test-service": "local"},
)
# This will fail because docker compose isn't running,
# but we can verify the command structure works
result = await run_compose(config, "test-service", "config", stream=False)
# Command may fail due to no docker, but structure is correct
assert result.service == "test-service"
class TestRunOnServices:
"""Tests for parallel service execution."""
async def test_run_on_services_parallel(self) -> None:
config = Config(
compose_dir=Path("/tmp"),
hosts={"local": Host(address="localhost")},
services={"svc1": "local", "svc2": "local"},
)
# Use a simple command that will work without docker
# We'll test the parallelism structure
results = await run_on_services(config, ["svc1", "svc2"], "version", stream=False)
assert len(results) == 2
assert results[0].service == "svc1"
assert results[1].service == "svc2"

132
tests/test_state.py Normal file
View File

@@ -0,0 +1,132 @@
"""Tests for state module."""
from pathlib import Path
import pytest
from compose_farm.config import Config, Host
from compose_farm.state import (
get_service_host,
load_state,
remove_service,
save_state,
set_service_host,
)
@pytest.fixture
def config(tmp_path: Path) -> Config:
"""Create a config with a temporary config path for state storage."""
config_path = tmp_path / "compose-farm.yaml"
config_path.write_text("") # Create empty file
return Config(
compose_dir=tmp_path / "compose",
hosts={"nas01": Host(address="192.168.1.10")},
services={"plex": "nas01"},
config_path=config_path,
)
class TestLoadState:
"""Tests for load_state function."""
def test_load_state_empty(self, config: Config) -> None:
"""Returns empty dict when state file doesn't exist."""
result = load_state(config)
assert result == {}
def test_load_state_with_data(self, config: Config) -> None:
"""Loads existing state from file."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n jellyfin: nas02\n")
result = load_state(config)
assert result == {"plex": "nas01", "jellyfin": "nas02"}
def test_load_state_empty_file(self, config: Config) -> None:
"""Returns empty dict for empty file."""
state_file = config.get_state_path()
state_file.write_text("")
result = load_state(config)
assert result == {}
class TestSaveState:
"""Tests for save_state function."""
def test_save_state(self, config: Config) -> None:
"""Saves state to file."""
save_state(config, {"plex": "nas01", "jellyfin": "nas02"})
state_file = config.get_state_path()
assert state_file.exists()
content = state_file.read_text()
assert "plex: nas01" in content
assert "jellyfin: nas02" in content
class TestGetServiceHost:
"""Tests for get_service_host function."""
def test_get_existing_service(self, config: Config) -> None:
"""Returns host for existing service."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n")
host = get_service_host(config, "plex")
assert host == "nas01"
def test_get_nonexistent_service(self, config: Config) -> None:
"""Returns None for service not in state."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n")
host = get_service_host(config, "unknown")
assert host is None
class TestSetServiceHost:
"""Tests for set_service_host function."""
def test_set_new_service(self, config: Config) -> None:
"""Adds new service to state."""
set_service_host(config, "plex", "nas01")
result = load_state(config)
assert result["plex"] == "nas01"
def test_update_existing_service(self, config: Config) -> None:
"""Updates host for existing service."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n")
set_service_host(config, "plex", "nas02")
result = load_state(config)
assert result["plex"] == "nas02"
class TestRemoveService:
"""Tests for remove_service function."""
def test_remove_existing_service(self, config: Config) -> None:
"""Removes service from state."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n jellyfin: nas02\n")
remove_service(config, "plex")
result = load_state(config)
assert "plex" not in result
assert result["jellyfin"] == "nas02"
def test_remove_nonexistent_service(self, config: Config) -> None:
"""Removing nonexistent service doesn't error."""
state_file = config.get_state_path()
state_file.write_text("deployed:\n plex: nas01\n")
remove_service(config, "unknown") # Should not raise
result = load_state(config)
assert result["plex"] == "nas01"

175
tests/test_sync.py Normal file
View File

@@ -0,0 +1,175 @@
"""Tests for sync command and related functions."""
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, patch
import pytest
from compose_farm import cli as cli_module
from compose_farm import executor as executor_module
from compose_farm import operations as operations_module
from compose_farm import state as state_module
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult, check_service_running
@pytest.fixture
def mock_config(tmp_path: Path) -> Config:
"""Create a mock config for testing."""
compose_dir = tmp_path / "stacks"
compose_dir.mkdir()
# Create service directories with compose files
for service in ["plex", "jellyfin", "sonarr"]:
svc_dir = compose_dir / service
svc_dir.mkdir()
(svc_dir / "compose.yaml").write_text(f"# {service} compose file\n")
return Config(
compose_dir=compose_dir,
hosts={
"nas01": Host(address="192.168.1.10", user="admin", port=22),
"nas02": Host(address="192.168.1.11", user="admin", port=22),
},
services={
"plex": "nas01",
"jellyfin": "nas01",
"sonarr": "nas02",
},
)
@pytest.fixture
def state_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
"""Create a temporary state directory and patch _get_state_path."""
state_path = tmp_path / ".config" / "compose-farm"
state_path.mkdir(parents=True)
def mock_get_state_path() -> Path:
return state_path / "state.yaml"
monkeypatch.setattr(state_module, "_get_state_path", mock_get_state_path)
return state_path
class TestCheckServiceRunning:
"""Tests for check_service_running function."""
@pytest.mark.asyncio
async def test_service_running(self, mock_config: Config) -> None:
"""Returns True when service has running containers."""
with patch.object(executor_module, "run_command", new_callable=AsyncMock) as mock_run:
mock_run.return_value = CommandResult(
service="plex",
exit_code=0,
success=True,
stdout="abc123\ndef456\n",
)
result = await check_service_running(mock_config, "plex", "nas01")
assert result is True
@pytest.mark.asyncio
async def test_service_not_running(self, mock_config: Config) -> None:
"""Returns False when service has no running containers."""
with patch.object(executor_module, "run_command", new_callable=AsyncMock) as mock_run:
mock_run.return_value = CommandResult(
service="plex",
exit_code=0,
success=True,
stdout="",
)
result = await check_service_running(mock_config, "plex", "nas01")
assert result is False
@pytest.mark.asyncio
async def test_command_failed(self, mock_config: Config) -> None:
"""Returns False when command fails."""
with patch.object(executor_module, "run_command", new_callable=AsyncMock) as mock_run:
mock_run.return_value = CommandResult(
service="plex",
exit_code=1,
success=False,
)
result = await check_service_running(mock_config, "plex", "nas01")
assert result is False
class TestDiscoverRunningServices:
"""Tests for discover_running_services function."""
@pytest.mark.asyncio
async def test_discovers_on_assigned_host(self, mock_config: Config) -> None:
"""Discovers service running on its assigned host."""
with patch.object(
operations_module, "check_service_running", new_callable=AsyncMock
) as mock_check:
# plex running on nas01, jellyfin not running, sonarr on nas02
async def check_side_effect(_cfg: Any, service: str, host: str) -> bool:
return (service == "plex" and host == "nas01") or (
service == "sonarr" and host == "nas02"
)
mock_check.side_effect = check_side_effect
result = await operations_module.discover_running_services(mock_config)
assert result == {"plex": "nas01", "sonarr": "nas02"}
@pytest.mark.asyncio
async def test_discovers_on_different_host(self, mock_config: Config) -> None:
"""Discovers service running on non-assigned host (after migration)."""
with patch.object(
operations_module, "check_service_running", new_callable=AsyncMock
) as mock_check:
# plex migrated to nas02
async def check_side_effect(_cfg: Any, service: str, host: str) -> bool:
return service == "plex" and host == "nas02"
mock_check.side_effect = check_side_effect
result = await operations_module.discover_running_services(mock_config)
assert result == {"plex": "nas02"}
class TestReportSyncChanges:
"""Tests for _report_sync_changes function."""
def test_reports_added(self, capsys: pytest.CaptureFixture[str]) -> None:
"""Reports newly discovered services."""
cli_module._report_sync_changes(
added=["plex", "jellyfin"],
removed=[],
changed=[],
discovered={"plex": "nas01", "jellyfin": "nas02"},
current_state={},
)
captured = capsys.readouterr()
assert "New services found (2)" in captured.out
assert "+ plex on nas01" in captured.out
assert "+ jellyfin on nas02" in captured.out
def test_reports_removed(self, capsys: pytest.CaptureFixture[str]) -> None:
"""Reports services that are no longer running."""
cli_module._report_sync_changes(
added=[],
removed=["sonarr"],
changed=[],
discovered={},
current_state={"sonarr": "nas01"},
)
captured = capsys.readouterr()
assert "Services no longer running (1)" in captured.out
assert "- sonarr (was on nas01)" in captured.out
def test_reports_changed(self, capsys: pytest.CaptureFixture[str]) -> None:
"""Reports services that moved to a different host."""
cli_module._report_sync_changes(
added=[],
removed=[],
changed=[("plex", "nas01", "nas02")],
discovered={"plex": "nas02"},
current_state={"plex": "nas01"},
)
captured = capsys.readouterr()
assert "Services on different hosts (1)" in captured.out
assert "~ plex: nas01 → nas02" in captured.out

View File

@@ -4,6 +4,7 @@ from pathlib import Path
import yaml
from compose_farm.compose import parse_external_networks
from compose_farm.config import Config, Host
from compose_farm.traefik import generate_traefik_config
@@ -76,7 +77,7 @@ def test_generate_traefik_config_without_published_port_warns(tmp_path: Path) ->
dynamic, warnings = generate_traefik_config(cfg, ["app"])
assert dynamic["http"]["routers"]["app"]["rule"] == "Host(`app.lab.mydomain.org`)"
assert any("No host-published port found" in warning for warning in warnings)
assert any("No published port found" in warning for warning in warnings)
def test_generate_interpolates_env_and_infers_router_service(tmp_path: Path) -> None:
@@ -193,3 +194,145 @@ def test_generate_skips_services_with_enable_false(tmp_path: Path) -> None:
assert dynamic == {}
assert warnings == []
def test_generate_follows_network_mode_service_for_ports(tmp_path: Path) -> None:
"""Services using network_mode: service:X should use ports from service X."""
cfg = Config(
compose_dir=tmp_path,
hosts={"nas01": Host(address="192.168.1.10")},
services={"vpn-stack": "nas01"},
)
compose_path = tmp_path / "vpn-stack" / "docker-compose.yml"
_write_compose(
compose_path,
{
"services": {
"vpn": {
"image": "gluetun",
"ports": ["5080:5080", "9696:9696"],
},
"qbittorrent": {
"image": "qbittorrent",
"network_mode": "service:vpn",
"labels": [
"traefik.enable=true",
"traefik.http.routers.torrent.rule=Host(`torrent.example.com`)",
"traefik.http.services.torrent.loadbalancer.server.port=5080",
],
},
"prowlarr": {
"image": "prowlarr",
"network_mode": "service:vpn",
"labels": [
"traefik.enable=true",
"traefik.http.routers.prowlarr.rule=Host(`prowlarr.example.com`)",
"traefik.http.services.prowlarr.loadbalancer.server.port=9696",
],
},
}
},
)
dynamic, warnings = generate_traefik_config(cfg, ["vpn-stack"])
assert warnings == []
# Both services should get their ports from the vpn service
torrent_servers = dynamic["http"]["services"]["torrent"]["loadbalancer"]["servers"]
assert torrent_servers == [{"url": "http://192.168.1.10:5080"}]
prowlarr_servers = dynamic["http"]["services"]["prowlarr"]["loadbalancer"]["servers"]
assert prowlarr_servers == [{"url": "http://192.168.1.10:9696"}]
def test_parse_external_networks_single(tmp_path: Path) -> None:
"""Extract a single external network from compose file."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
compose_path = tmp_path / "app" / "compose.yaml"
_write_compose(
compose_path,
{
"services": {"app": {"image": "nginx"}},
"networks": {"mynetwork": {"external": True}},
},
)
networks = parse_external_networks(cfg, "app")
assert networks == ["mynetwork"]
def test_parse_external_networks_multiple(tmp_path: Path) -> None:
"""Extract multiple external networks from compose file."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
compose_path = tmp_path / "app" / "compose.yaml"
_write_compose(
compose_path,
{
"services": {"app": {"image": "nginx"}},
"networks": {
"frontend": {"external": True},
"backend": {"external": True},
"internal": {"driver": "bridge"}, # not external
},
},
)
networks = parse_external_networks(cfg, "app")
assert set(networks) == {"frontend", "backend"}
def test_parse_external_networks_none(tmp_path: Path) -> None:
"""No external networks returns empty list."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
compose_path = tmp_path / "app" / "compose.yaml"
_write_compose(
compose_path,
{
"services": {"app": {"image": "nginx"}},
"networks": {"internal": {"driver": "bridge"}},
},
)
networks = parse_external_networks(cfg, "app")
assert networks == []
def test_parse_external_networks_no_networks_section(tmp_path: Path) -> None:
"""No networks section returns empty list."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
compose_path = tmp_path / "app" / "compose.yaml"
_write_compose(
compose_path,
{"services": {"app": {"image": "nginx"}}},
)
networks = parse_external_networks(cfg, "app")
assert networks == []
def test_parse_external_networks_missing_compose(tmp_path: Path) -> None:
"""Missing compose file returns empty list."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
# Don't create compose file
networks = parse_external_networks(cfg, "app")
assert networks == []

2
uv.lock generated
View File

@@ -131,6 +131,7 @@ dependencies = [
{ name = "asyncssh" },
{ name = "pydantic" },
{ name = "pyyaml" },
{ name = "rich" },
{ name = "typer" },
]
@@ -151,6 +152,7 @@ requires-dist = [
{ name = "asyncssh", specifier = ">=2.14.0" },
{ name = "pydantic", specifier = ">=2.0.0" },
{ name = "pyyaml", specifier = ">=6.0" },
{ name = "rich", specifier = ">=13.0.0" },
{ name = "typer", specifier = ">=0.9.0" },
]