Compare commits

...

29 Commits

Author SHA1 Message Date
Bas Nijholt
da61436fbb Use native ssh for raw mode, asyncssh for streaming
- Raw mode uses subprocess with `ssh -t` for proper TTY handling
- Progress bars now render correctly on remote hosts
- asyncssh still used for non-raw parallel streaming with prefixes
- Remove redundant header prints (operations.py handles them)
2025-12-14 15:12:48 -08:00
Bas Nijholt
b6025af0c8 Fix newline after raw output to prevent line mixing 2025-12-14 14:49:33 -08:00
Bas Nijholt
ab914677c4 Add progress counter [n/total] to up command 2025-12-14 14:48:48 -08:00
Bas Nijholt
c0b421f812 Add --migrate flag to up command
Automatically detects services where state differs from config
and migrates only those. Usage: cf up --migrate or cf up -m
2025-12-14 14:47:43 -08:00
Bas Nijholt
2a446c800f Always use raw output for up command
- Print service header before raw output (local and SSH)
- up command always uses raw=True since migrations are sequential
- Gives clean progress bar output without per-line prefixes
2025-12-14 14:44:53 -08:00
Bas Nijholt
dc541c0298 test: Skip shell-dependent tests on Windows/Mac 2025-12-14 14:28:31 -08:00
Bas Nijholt
4d9b8b5ba4 docs: Add TrueNAS NFS crossmnt workaround
Documents how to access child ZFS datasets over NFS by injecting
the crossmnt option into /etc/exports. Includes Python script and
setup instructions for cron-based persistence.
2025-12-14 14:11:10 -08:00
Bas Nijholt
566a07d3a4 Refactor: separate concerns into dedicated modules
- Extract compose.py from traefik.py for generic compose parsing
  (env loading, interpolation, ports, volumes, networks)
- Rename ssh.py to executor.py for clarity
- Extract operations.py from cli.py for business logic
  (up_services, discover_running_services, preflight checks)
- Update CLAUDE.md with new architecture diagram
- Add docs/dev/future-improvements.md for low-priority items

CLI is now a thin layer that delegates to operations module.
All 70 tests pass.
2025-12-14 12:49:24 -08:00
Bas Nijholt
921ce6f13a Add raw output mode for single-service operations
When operating on a single service, pass output directly to
stdout/stderr instead of prefixing each line with [service].
This enables proper handling of \r progress bars during
docker pull, up, etc.
2025-12-14 12:15:36 -08:00
Bas Nijholt
708e09a8cc Show target host when starting services 2025-12-14 12:09:07 -08:00
Bas Nijholt
04154b84f6 Add tests for network and path checking
- test_traefik: Tests for parse_external_networks()
- test_ssh: Tests for check_paths_exist() and check_networks_exist()
2025-12-14 12:08:35 -08:00
Bas Nijholt
2bc9b09e58 Add Docker network validation and init-network command
- check: Validates external networks exist on configured hosts
- up: Pre-flight check blocks if networks missing on target host
- init-network: Creates Docker network with consistent subnet/gateway
  across hosts (default: mynetwork 172.20.0.0/16)

Networks defined as `external: true` in compose files are now
checked before starting or migrating services.
2025-12-14 12:06:36 -08:00
Bas Nijholt
16d517dcd0 docs: Update README and CLAUDE.md for redesigned check command 2025-12-14 10:56:04 -08:00
Bas Nijholt
5e8d09b010 Redesign check command: unified validation + host compatibility
Merged check-mounts into check command. Now provides:
- Config validation (compose files exist)
- Traefik label validation
- Mount path validation (SSH-based)
- Host compatibility matrix when checking specific services

Usage:
  cf check              # Full validation of all services
  cf check --local      # Skip SSH mount checks (fast)
  cf check jellyfin     # Check service + show which hosts can run it

Removed standalone check-mounts command (merged into check).
2025-12-14 10:43:34 -08:00
Bas Nijholt
6fc3535449 Add pre-flight mount check before migration
When migrating a service to a new host, check that all required volume
mount paths exist on the target host BEFORE running down on the old host.
This prevents failed migrations where the service is stopped but can't
start on the new host due to missing NFS mounts.
2025-12-14 10:30:56 -08:00
Bas Nijholt
9158dba0ce Add check-mounts command to verify NFS paths exist
New command to verify volume mount paths exist on target hosts before
migration. Parses bind mounts from compose files and SSHs to hosts to
check each path exists.

- check_paths_exist() in ssh.py: batch check multiple paths efficiently
- parse_host_volumes() in traefik.py: extract bind mount paths from compose
- check-mounts command in cli.py: groups by host, reports missing paths

Usage: cf check-mounts plex jellyfin
       cf check-mounts --all
2025-12-14 10:25:26 -08:00
Bas Nijholt
7b2c431ca3 fix: Change whoami example port to 18082 to avoid conflicts 2025-12-14 09:46:20 -08:00
Bas Nijholt
9deb460cfc Add Traefik example to examples directory
- traefik/docker-compose.yml: Traefik with docker and file providers
- whoami/docker-compose.yml: Test service with Traefik labels
- Updated compose-farm.yaml with traefik_file auto-regeneration
- Updated README.md with Traefik usage instructions
2025-12-14 09:44:03 -08:00
Bas Nijholt
2ce6f2473b docs: Add Traefik config options to example 2025-12-14 01:19:13 -08:00
Bas Nijholt
04d8444168 docs: Use consistent server-1/server-2 naming in example config 2025-12-14 01:18:50 -08:00
Bas Nijholt
b539c4ba76 docs: Update CLAUDE.md with all modules and commands 2025-12-14 01:17:30 -08:00
Bas Nijholt
473bc089c7 docs: Use consistent server-1/server-2 naming throughout 2025-12-14 01:15:46 -08:00
Bas Nijholt
50f405eb77 docs: Use uv tool install for CLI tools 2025-12-14 01:14:12 -08:00
Bas Nijholt
fd0d3bcbcf docs: Use clearer host names in NFS example 2025-12-14 01:13:58 -08:00
Bas Nijholt
f2e8ab0387 docs: Recommend uv for installation 2025-12-14 01:13:24 -08:00
Bas Nijholt
dfbf2748c7 docs: Reorganize README for better flow 2025-12-14 01:12:09 -08:00
Bas Nijholt
57b0ba5916 CSS for logo 2025-12-14 00:59:59 -08:00
Bas Nijholt
e668fb0faf Add logo 2025-12-14 00:58:58 -08:00
Bas Nijholt
2702203cb5 fix: Handle non-string address in getaddrinfo result 2025-12-14 00:55:11 -08:00
24 changed files with 1780 additions and 516 deletions

View File

@@ -10,19 +10,26 @@
```
compose_farm/
├── config.py # Pydantic models, YAML loading
├── ssh.py # asyncssh execution, streaming
── cli.py # Typer commands
├── cli.py # Typer commands (thin layer, delegates to operations)
├── config.py # Pydantic models, YAML loading
── compose.py # Compose file parsing (.env, ports, volumes, networks)
├── executor.py # SSH/local command execution, streaming output
├── operations.py # Business logic (up, migrate, discover, preflight checks)
├── state.py # Deployment state tracking (which service on which host)
├── logs.py # Image digest snapshots (dockerfarm-log.toml)
└── traefik.py # Traefik file-provider config generation from labels
```
## Key Design Decisions
1. **asyncssh over Paramiko/Fabric**: Native async support, built-in streaming
2. **Parallel by default**: Multiple services run concurrently via `asyncio.gather`
3. **Streaming output**: Real-time stdout/stderr with `[service]` prefix
3. **Streaming output**: Real-time stdout/stderr with `[service]` prefix using Rich
4. **SSH key auth only**: Uses ssh-agent, no password handling (YAGNI)
5. **NFS assumption**: Compose files at same path on all hosts
6. **Local execution**: When host is `localhost`/`local`, skip SSH and run locally
6. **Local IP auto-detection**: Skips SSH when target host matches local machine's IP
7. **State tracking**: Tracks where services are deployed for auto-migration
8. **Pre-flight checks**: Verifies NFS mounts and Docker networks exist before starting/migrating
## Communication Notes
@@ -36,12 +43,18 @@ compose_farm/
## Commands Quick Reference
| Command | Docker Compose Equivalent |
|---------|--------------------------|
| `up` | `docker compose up -d` |
| `down` | `docker compose down` |
| `pull` | `docker compose pull` |
| `restart` | `down` + `up -d` |
CLI available as `cf` or `compose-farm`.
| Command | Description |
|---------|-------------|
| `up` | Start services (`docker compose up -d`), auto-migrates if host changed |
| `down` | Stop services (`docker compose down`) |
| `pull` | Pull latest images |
| `restart` | `down` + `up -d` |
| `update` | `pull` + `down` + `up -d` |
| `logs` | `docker compose logs` |
| `ps` | `docker compose ps` |
| `logs` | Show service logs |
| `ps` | Show status of all services |
| `sync` | Discover running services, update state, capture image digests |
| `check` | Validate config, traefik labels, mounts, networks; show host compatibility |
| `init-network` | Create Docker network on hosts with consistent subnet/gateway |
| `traefik-file` | Generate Traefik file-provider config from compose labels |

103
README.md
View File

@@ -1,5 +1,7 @@
# Compose Farm
<img src="http://files.nijho.lt/compose-farm.png" align="right" style="width: 300px;" />
A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
> [!NOTE]
@@ -9,7 +11,8 @@ A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
- [Why Compose Farm?](#why-compose-farm)
- [Key Assumption: Shared Storage](#key-assumption-shared-storage)
- [How It Works](#how-it-works)
- [Requirements](#requirements)
- [Limitations & Best Practices](#limitations--best-practices)
- [What breaks when you move a service](#what-breaks-when-you-move-a-service)
- [Best practices](#best-practices)
@@ -19,8 +22,6 @@ A minimal CLI tool to run Docker Compose commands across multiple hosts via SSH.
- [Usage](#usage)
- [Auto-Migration](#auto-migration)
- [Traefik Multihost Ingress (File Provider)](#traefik-multihost-ingress-file-provider)
- [Requirements](#requirements)
- [How It Works](#how-it-works)
- [License](#license)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
@@ -34,19 +35,35 @@ I run 100+ Docker Compose stacks on an LXC container that frequently runs out of
**Compose Farm is intentionally simple**: one YAML config mapping services to hosts, and a CLI that runs `docker compose` commands over SSH. That's it.
## Key Assumption: Shared Storage
## How It Works
Compose Farm assumes **all your compose files are accessible at the same path on all hosts**. This is typically achieved via:
1. You run `cf up plex`
2. Compose Farm looks up which host runs `plex` (e.g., `server-1`)
3. It SSHs to `server-1` (or runs locally if `localhost`)
4. It executes `docker compose -f /opt/compose/plex/docker-compose.yml up -d`
5. Output is streamed back with `[plex]` prefix
That's it. No orchestration, no service discovery, no magic.
## Requirements
- Python 3.11+ (we recommend [uv](https://docs.astral.sh/uv/) for installation)
- SSH key-based authentication to your hosts (uses ssh-agent)
- Docker and Docker Compose installed on all target hosts
- **Shared storage**: All compose files must be accessible at the same path on all hosts
- **Docker networks**: External networks must exist on all hosts (use `cf init-network` to create)
Compose Farm assumes your compose files are accessible at the same path on all hosts. This is typically achieved via:
- **NFS mount** (e.g., `/opt/compose` mounted from a NAS)
- **Synced folders** (e.g., Syncthing, rsync)
- **Shared filesystem** (e.g., GlusterFS, Ceph)
```
# Example: NFS mount on all hosts
nas:/volume1/compose → /opt/compose (on nas01)
nas:/volume1/compose → /opt/compose (on nas02)
nas:/volume1/compose → /opt/compose (on nas03)
# Example: NFS mount on all Docker hosts
nas:/volume1/compose → /opt/compose (on server-1)
nas:/volume1/compose → /opt/compose (on server-2)
nas:/volume1/compose → /opt/compose (on server-3)
```
Compose Farm simply runs `docker compose -f /opt/compose/{service}/docker-compose.yml` on the appropriate host—it doesn't copy or sync files.
@@ -85,9 +102,9 @@ If you need containers on different hosts to communicate seamlessly, you need Do
## Installation
```bash
pip install compose-farm
uv tool install compose-farm
# or
uv pip install compose-farm
pip install compose-farm
```
## Configuration
@@ -98,18 +115,18 @@ Create `~/.config/compose-farm/compose-farm.yaml` (or `./compose-farm.yaml` in y
compose_dir: /opt/compose # Must be the same path on all hosts
hosts:
nas01:
server-1:
address: 192.168.1.10
user: docker
nas02:
server-2:
address: 192.168.1.11
# user defaults to current user
local: localhost # Run locally without SSH
services:
plex: nas01
jellyfin: nas02
sonarr: nas01
plex: server-1
jellyfin: server-2
sonarr: server-1
radarr: local # Runs on the machine where you invoke compose-farm
```
@@ -140,8 +157,14 @@ cf update --all
cf sync # updates state.yaml and dockerfarm-log.toml
cf sync --dry-run # preview without writing
# Check config vs disk (find missing services, validate traefik labels)
cf check
# Validate config, traefik labels, mounts, and networks
cf check # full validation (includes SSH checks)
cf check --local # fast validation (skip SSH)
cf check jellyfin # check service + show which hosts can run it
# Create Docker network on new hosts (before migrating services)
cf init-network nuc hp # create mynetwork on specific hosts
cf init-network # create on all hosts
# View logs
cf logs plex
@@ -154,23 +177,24 @@ cf ps
### Auto-Migration
When you change a service's host assignment in config and run `up`, Compose Farm automatically:
1. Runs `down` on the old host
2. Runs `up -d` on the new host
3. Updates state tracking
1. Checks that required mounts and networks exist on the new host (aborts if missing)
2. Runs `down` on the old host
3. Runs `up -d` on the new host
4. Updates state tracking
```yaml
# Before: plex runs on nas01
# Before: plex runs on server-1
services:
plex: nas01
plex: server-1
# After: change to nas02, then run `cf up plex`
# After: change to server-2, then run `cf up plex`
services:
plex: nas02 # Compose Farm will migrate automatically
plex: server-2 # Compose Farm will migrate automatically
```
## Traefik Multihost Ingress (File Provider)
If you run a single Traefik instance on one frontdoor host and want it to route to
If you run a single Traefik instance on one "frontdoor" host and want it to route to
Compose Farm services on other hosts, Compose Farm can generate a Traefik fileprovider
fragment from your existing compose labels.
@@ -179,11 +203,11 @@ fragment from your existing compose labels.
- Your `docker-compose.yml` remains the source of truth. Put normal `traefik.*` labels on
the container you want exposed.
- Labels and port specs may use `${VAR}` / `${VAR:-default}`; Compose Farm resolves these
using the stacks `.env` file and your current environment, just like Docker Compose.
using the stack's `.env` file and your current environment, just like Docker Compose.
- Publish a host port for that container (via `ports:`). The generator prefers
hostpublished ports so Traefik can reach the service across hosts; if none are found,
it warns and youd need L3 reachability to container IPs.
- If a router label doesnt specify `traefik.http.routers.<name>.service` and theres only
it warns and you'd need L3 reachability to container IPs.
- If a router label doesn't specify `traefik.http.routers.<name>.service` and there's only
one Traefik service defined on that container, Compose Farm wires the router to it.
- `compose-farm.yaml` stays unchanged: just `hosts` and `services: service → host`.
@@ -235,8 +259,8 @@ traefik_service: traefik # skip services on same host (docker provider handles
hosts:
# ...
services:
traefik: nas01 # Traefik runs here
plex: nas02 # Services on other hosts get file-provider entries
traefik: server-1 # Traefik runs here
plex: server-2 # Services on other hosts get file-provider entries
# ...
```
@@ -268,23 +292,6 @@ Update your Traefik config to use directory watching instead of a single file:
- --providers.file.watch=true
```
## Requirements
- Python 3.11+
- SSH key-based authentication to your hosts (uses ssh-agent)
- Docker and Docker Compose installed on all target hosts
- **Shared storage**: All compose files at the same path on all hosts (NFS, Syncthing, etc.)
## How It Works
1. You run `cf up plex`
2. Compose Farm looks up which host runs `plex` (e.g., `nas01`)
3. It SSHs to `nas01` (or runs locally if `localhost`)
4. It executes `docker compose -f /opt/compose/plex/docker-compose.yml up -d`
5. Output is streamed back with `[plex]` prefix
That's it. No orchestration, no service discovery, no magic.
## License
MIT

View File

@@ -3,23 +3,28 @@
compose_dir: /opt/compose
# Optional: Auto-regenerate Traefik file-provider config after up/down/restart/update
traefik_file: /opt/traefik/dynamic.d/compose-farm.yml
traefik_service: traefik # Skip services on same host (docker provider handles them)
hosts:
# Full form with all options
nas01:
server-1:
address: 192.168.1.10
user: docker
port: 22
# Short form (just address, user defaults to current user)
nas02: 192.168.1.11
server-2: 192.168.1.11
# Local execution (no SSH)
local: localhost
services:
# Map service names to hosts
# Compose file expected at: {compose_dir}/{service}/docker-compose.yml
plex: nas01
jellyfin: nas02
sonarr: nas01
radarr: nas02
# Compose file expected at: {compose_dir}/{service}/compose.yaml
traefik: server-1 # Traefik runs here
plex: server-2 # Services on other hosts get file-provider entries
jellyfin: server-2
sonarr: server-1
radarr: local

View File

@@ -0,0 +1,128 @@
# Future Improvements
Low-priority improvements identified during code review. These are not currently causing issues but could be addressed if they become pain points.
## 1. State Module Efficiency (LOW)
**Current:** Every state operation reads and writes the entire file.
```python
def set_service_host(config, service, host):
state = load_state(config) # Read file
state[service] = host
save_state(config, state) # Write file
```
**Impact:** With 87 services, this is fine. With 1000+, it would be slow.
**Potential fix:** Add batch operations:
```python
def update_state(config, updates: dict[str, str | None]) -> None:
"""Batch update: set services to hosts, None means remove."""
state = load_state(config)
for service, host in updates.items():
if host is None:
state.pop(service, None)
else:
state[service] = host
save_state(config, state)
```
**When to do:** Only if state operations become noticeably slow.
---
## 2. Remote-Aware Compose Path Resolution (LOW)
**Current:** `config.get_compose_path()` checks if files exist on the local filesystem:
```python
def get_compose_path(self, service: str) -> Path:
for filename in ("compose.yaml", "compose.yml", ...):
candidate = service_dir / filename
if candidate.exists(): # Local check!
return candidate
```
**Why this works:** NFS/shared storage means local = remote.
**Why it could break:** If running compose-farm from a machine without the NFS mount, it returns `compose.yaml` (the default) even if `docker-compose.yml` exists on the remote host.
**Potential fix:** Query the remote host for file existence, or accept this limitation and document it.
**When to do:** Only if users need to run compose-farm from non-NFS machines.
---
## 3. Add Integration Tests for CLI Commands (MEDIUM)
**Current:** No integration tests for the actual CLI commands. Tests cover the underlying functions but not the Typer commands themselves.
**Potential fix:** Add integration tests using `CliRunner` from Typer:
```python
from typer.testing import CliRunner
from compose_farm.cli import app
runner = CliRunner()
def test_check_command_validates_config():
result = runner.invoke(app, ["check", "--local"])
assert result.exit_code == 0
```
**When to do:** When CLI behavior becomes complex enough to warrant dedicated testing.
---
## 4. Add Tests for operations.py (MEDIUM)
**Current:** Operations module has 30% coverage. Most logic is tested indirectly through test_sync.py.
**Potential fix:** Add dedicated tests for:
- `up_services()` with migration scenarios
- `preflight_check()`
- `check_host_compatibility()`
**When to do:** When adding new operations or modifying migration logic.
---
## 5. Consider Structured Logging (LOW)
**Current:** Operations print directly to console using Rich. This couples the operations module to the Rich library.
**Potential fix:** Use Python's logging module with a custom Rich handler:
```python
import logging
logger = logging.getLogger(__name__)
# In operations:
logger.info("Migrating %s from %s to %s", service, old_host, new_host)
# In cli.py - configure Rich handler:
from rich.logging import RichHandler
logging.basicConfig(handlers=[RichHandler()])
```
**Benefits:**
- Operations become testable without capturing stdout
- Logs can be redirected to files
- Log levels provide filtering
**When to do:** Only if console output coupling becomes a problem for testing or extensibility.
---
## Design Decisions to Keep
These patterns are working well and should be preserved:
1. **asyncio + asyncssh** - Solid async foundation
2. **Pydantic models** - Clean validation
3. **Rich for output** - Good UX
4. **Test structure** - Good coverage
5. **Module separation** - cli/operations/executor/compose pattern
6. **KISS principle** - Don't over-engineer

169
docs/truenas-nested-nfs.md Normal file
View File

@@ -0,0 +1,169 @@
# TrueNAS NFS: Accessing Child ZFS Datasets
When NFS-exporting a parent ZFS dataset on TrueNAS, child datasets appear as **empty directories** to NFS clients. This document explains the problem and provides a workaround.
## The Problem
TrueNAS structures storage as ZFS datasets. A common pattern is:
```
tank/data <- parent dataset (NFS exported)
tank/data/app1 <- child dataset
tank/data/app2 <- child dataset
```
When you create an NFS share for `tank/data`, clients mount it and see the `app1/` and `app2/` directories—but they're empty. This happens because each ZFS dataset is a separate filesystem, and NFS doesn't traverse into child filesystems by default.
## The Solution: `crossmnt`
The NFS `crossmnt` export option tells the server to allow clients to traverse into child filesystems. However, TrueNAS doesn't expose this option in the UI.
### Workaround Script
This Python script injects `crossmnt` into `/etc/exports`:
```python
#!/usr/bin/env python3
"""
Add crossmnt to TrueNAS NFS exports for child dataset visibility.
Usage: fix-nfs-crossmnt.py /mnt/pool/dataset
Setup:
1. scp fix-nfs-crossmnt.py root@truenas.local:/root/
2. chmod +x /root/fix-nfs-crossmnt.py
3. Test: /root/fix-nfs-crossmnt.py /mnt/pool/dataset
4. Add cron job: TrueNAS UI > System > Advanced > Cron Jobs
Command: /root/fix-nfs-crossmnt.py /mnt/pool/dataset
Schedule: */5 * * * *
"""
import re
import subprocess
import sys
from pathlib import Path
EXPORTS_FILE = Path("/etc/exports")
def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} /mnt/pool/dataset", file=sys.stderr)
return 1
export_path = sys.argv[1]
content = EXPORTS_FILE.read_text()
if f'"{export_path}"' not in content:
print(f"ERROR: {export_path} not found in {EXPORTS_FILE}", file=sys.stderr)
return 1
lines = content.splitlines()
result = []
in_block = False
modified = False
for line in lines:
if f'"{export_path}"' in line:
in_block = True
elif line.startswith('"'):
in_block = False
if in_block and line[:1] in (" ", "\t") and "crossmnt" not in line:
line = re.sub(r"\)(\\\s*)?$", r",crossmnt)\1", line)
modified = True
result.append(line)
if not modified:
return 0 # Already applied
EXPORTS_FILE.write_text("\n".join(result) + "\n")
subprocess.run(["exportfs", "-ra"], check=True)
print(f"Added crossmnt to {export_path}")
return 0
if __name__ == "__main__":
sys.exit(main())
```
## Setup Instructions
### 1. Copy the script to TrueNAS
```bash
scp fix-nfs-crossmnt.py root@truenas.local:/root/
ssh root@truenas.local chmod +x /root/fix-nfs-crossmnt.py
```
### 2. Test manually
```bash
ssh root@truenas.local
# Run the script
/root/fix-nfs-crossmnt.py /mnt/tank/data
# Verify crossmnt was added
cat /etc/exports
```
You should see `,crossmnt` added to the client options:
```
"/mnt/tank/data"\
192.168.1.10(sec=sys,rw,no_subtree_check,crossmnt)\
192.168.1.11(sec=sys,rw,no_subtree_check,crossmnt)
```
### 3. Verify on NFS client
```bash
# Before: empty directory
ls /mnt/data/app1/
# (nothing)
# After: actual contents visible
ls /mnt/data/app1/
# config.yaml data/ logs/
```
### 4. Make it persistent
TrueNAS regenerates `/etc/exports` when you modify NFS shares in the UI. To survive this, set up a cron job:
1. Go to **TrueNAS UI → System → Advanced → Cron Jobs → Add**
2. Configure:
- **Description:** Fix NFS crossmnt
- **Command:** `/root/fix-nfs-crossmnt.py /mnt/tank/data`
- **Run As User:** root
- **Schedule:** `*/5 * * * *` (every 5 minutes)
- **Enabled:** checked
3. Save
The script is idempotent—it only modifies the file if `crossmnt` is missing, and skips the write entirely if already applied.
## How It Works
1. Parses `/etc/exports` to find the specified export block
2. Adds `,crossmnt` before the closing `)` on each client line
3. Writes the file only if changes were made
4. Runs `exportfs -ra` to reload the NFS configuration
## Why Not Use SMB Instead?
SMB handles child datasets seamlessly, but:
- NFS is simpler for Linux-to-Linux with matching UIDs
- SMB requires more complex permission mapping for Docker volumes
- Many existing setups already use NFS
## Related Links
- [TrueNAS Forum: Add crossmnt option to NFS exports](https://forums.truenas.com/t/add-crossmnt-option-to-nfs-exports/10573)
- [exports(5) man page](https://man7.org/linux/man-pages/man5/exports.5.html) - see `crossmnt` option
## Tested On
- TrueNAS SCALE 24.10

View File

@@ -32,11 +32,51 @@ compose-farm down nginx
compose-farm update --all
```
## Traefik Example
Start Traefik and a sample service with Traefik labels:
```bash
cd examples
# Start Traefik (reverse proxy with dashboard)
compose-farm up traefik
# Start whoami (test service with Traefik labels)
compose-farm up whoami
# Access the services
curl -H "Host: whoami.localhost" http://localhost # whoami via Traefik
curl http://localhost:8081 # Traefik dashboard
curl http://localhost:18082 # whoami direct
# Generate Traefik file-provider config (for multi-host setups)
compose-farm traefik-file --all
# Stop everything
compose-farm down --all
```
The `whoami/docker-compose.yml` shows the standard Traefik label pattern:
```yaml
labels:
- traefik.enable=true
- traefik.http.routers.whoami.rule=Host(`whoami.localhost`)
- traefik.http.routers.whoami.entrypoints=web
- traefik.http.services.whoami.loadbalancer.server.port=80
```
## Services
- **hello**: Simple hello-world container (exits immediately)
- **nginx**: Nginx web server on port 8080
| Service | Description | Ports |
|---------|-------------|-------|
| hello | Hello-world container (exits immediately) | - |
| nginx | Nginx web server | 8080 |
| traefik | Traefik reverse proxy with dashboard | 80, 8081 |
| whoami | Test service with Traefik labels | 18082 |
## Config
The `compose-farm.yaml` in this directory configures both services to run locally (no SSH).
The `compose-farm.yaml` in this directory configures all services to run locally (no SSH).
It also demonstrates the `traefik_file` option for auto-regenerating Traefik file-provider config.

View File

@@ -0,0 +1 @@
deployed: {}

View File

@@ -3,9 +3,15 @@
compose_dir: .
# Auto-regenerate Traefik file-provider config after up/down/restart/update
traefik_file: ./traefik/dynamic.d/compose-farm.yml
traefik_service: traefik # Skip services on same host (docker provider handles them)
hosts:
local: localhost
services:
hello: local
nginx: local
traefik: local
whoami: local

View File

@@ -0,0 +1,17 @@
services:
traefik:
image: traefik:v3.2
container_name: traefik
command:
- --api.insecure=true
- --providers.docker=true
- --providers.docker.exposedbydefault=false
- --providers.file.directory=/dynamic.d
- --providers.file.watch=true
- --entrypoints.web.address=:80
ports:
- "80:80"
- "8081:8080" # Traefik dashboard
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- ./dynamic.d:/dynamic.d

View File

View File

@@ -0,0 +1 @@
{}

View File

@@ -0,0 +1,11 @@
services:
whoami:
image: traefik/whoami
container_name: whoami
ports:
- "18082:80"
labels:
- traefik.enable=true
- traefik.http.routers.whoami.rule=Host(`whoami.localhost`)
- traefik.http.routers.whoami.entrypoints=web
- traefik.http.services.whoami.loadbalancer.server.port=80

View File

@@ -12,16 +12,16 @@ from rich.console import Console
from . import __version__
from .config import Config, load_config
from .executor import CommandResult, run_command, run_on_services, run_sequential_on_services
from .logs import snapshot_services
from .ssh import (
CommandResult,
check_service_running,
run_compose,
run_compose_on_host,
run_on_services,
run_sequential_on_services,
from .operations import (
check_host_compatibility,
check_mounts_on_configured_hosts,
check_networks_on_configured_hosts,
discover_running_services,
up_services,
)
from .state import get_service_host, load_state, remove_service, save_state, set_service_host
from .state import get_services_needing_migration, load_state, remove_service, save_state
from .traefik import generate_traefik_config
if TYPE_CHECKING:
@@ -49,10 +49,19 @@ def _maybe_regenerate_traefik(cfg: Config) -> None:
try:
dynamic, warnings = generate_traefik_config(cfg, list(cfg.services.keys()))
cfg.traefik_file.parent.mkdir(parents=True, exist_ok=True)
cfg.traefik_file.write_text(yaml.safe_dump(dynamic, sort_keys=False))
console.print() # Ensure we're on a new line after streaming output
console.print(f"[green]✓[/] Traefik config updated: {cfg.traefik_file}")
new_content = yaml.safe_dump(dynamic, sort_keys=False)
# Check if content changed
old_content = ""
if cfg.traefik_file.exists():
old_content = cfg.traefik_file.read_text()
if new_content != old_content:
cfg.traefik_file.parent.mkdir(parents=True, exist_ok=True)
cfg.traefik_file.write_text(new_content)
console.print() # Ensure we're on a new line after streaming output
console.print(f"[green]✓[/] Traefik config updated: {cfg.traefik_file}")
for warning in warnings:
err_console.print(f"[yellow]![/] {warning}")
except (FileNotFoundError, ValueError) as exc:
@@ -139,55 +148,30 @@ LogPathOption = Annotated[
typer.Option("--log-path", "-l", help="Path to Dockerfarm TOML log"),
]
async def _up_with_migration(
cfg: Config,
services: list[str],
) -> list[CommandResult]:
"""Start services with automatic migration if host changed."""
results: list[CommandResult] = []
for service in services:
target_host = cfg.services[service]
current_host = get_service_host(cfg, service)
# If service is deployed elsewhere, migrate it
if current_host and current_host != target_host:
if current_host in cfg.hosts:
console.print(
f"[cyan]\\[{service}][/] Migrating from "
f"[magenta]{current_host}[/] → [magenta]{target_host}[/]..."
)
down_result = await run_compose_on_host(cfg, service, current_host, "down")
if not down_result.success:
results.append(down_result)
continue
else:
err_console.print(
f"[cyan]\\[{service}][/] [yellow]![/] was on "
f"[magenta]{current_host}[/] (not in config), skipping down"
)
# Start on target host
up_result = await run_compose(cfg, service, "up -d")
results.append(up_result)
# Update state on success
if up_result.success:
set_service_host(cfg, service, target_host)
return results
MISSING_PATH_PREVIEW_LIMIT = 2
@app.command(rich_help_panel="Lifecycle")
def up(
services: ServicesArg = None,
all_services: AllOption = False,
migrate: Annotated[
bool, typer.Option("--migrate", "-m", help="Only services needing migration")
] = False,
config: ConfigOption = None,
) -> None:
"""Start services (docker compose up -d). Auto-migrates if host changed."""
svc_list, cfg = _get_services(services or [], all_services, config)
results = _run_async(_up_with_migration(cfg, svc_list))
if migrate:
cfg = _load_config_or_exit(config)
svc_list = get_services_needing_migration(cfg)
if not svc_list:
console.print("[green]✓[/] No services need migration")
return
console.print(f"[cyan]Migrating {len(svc_list)} service(s):[/] {', '.join(svc_list)}")
else:
svc_list, cfg = _get_services(services or [], all_services, config)
# Always use raw output - migrations are sequential anyway
results = _run_async(up_services(cfg, svc_list, raw=True))
_maybe_regenerate_traefik(cfg)
_report_results(results)
@@ -200,7 +184,8 @@ def down(
) -> None:
"""Stop services (docker compose down)."""
svc_list, cfg = _get_services(services or [], all_services, config)
results = _run_async(run_on_services(cfg, svc_list, "down"))
raw = len(svc_list) == 1
results = _run_async(run_on_services(cfg, svc_list, "down", raw=raw))
# Remove from state on success
for result in results:
@@ -219,7 +204,8 @@ def pull(
) -> None:
"""Pull latest images (docker compose pull)."""
svc_list, cfg = _get_services(services or [], all_services, config)
results = _run_async(run_on_services(cfg, svc_list, "pull"))
raw = len(svc_list) == 1
results = _run_async(run_on_services(cfg, svc_list, "pull", raw=raw))
_report_results(results)
@@ -231,7 +217,8 @@ def restart(
) -> None:
"""Restart services (down + up)."""
svc_list, cfg = _get_services(services or [], all_services, config)
results = _run_async(run_sequential_on_services(cfg, svc_list, ["down", "up -d"]))
raw = len(svc_list) == 1
results = _run_async(run_sequential_on_services(cfg, svc_list, ["down", "up -d"], raw=raw))
_maybe_regenerate_traefik(cfg)
_report_results(results)
@@ -244,7 +231,10 @@ def update(
) -> None:
"""Update services (pull + down + up)."""
svc_list, cfg = _get_services(services or [], all_services, config)
results = _run_async(run_sequential_on_services(cfg, svc_list, ["pull", "down", "up -d"]))
raw = len(svc_list) == 1
results = _run_async(
run_sequential_on_services(cfg, svc_list, ["pull", "down", "up -d"], raw=raw)
)
_maybe_regenerate_traefik(cfg)
_report_results(results)
@@ -311,30 +301,6 @@ def traefik_file(
err_console.print(f"[yellow]![/] {warning}")
async def _discover_running_services(cfg: Config) -> dict[str, str]:
"""Discover which services are running on which hosts.
Returns a dict mapping service names to host names for running services.
"""
discovered: dict[str, str] = {}
for service, assigned_host in cfg.services.items():
# Check assigned host first (most common case)
if await check_service_running(cfg, service, assigned_host):
discovered[service] = assigned_host
continue
# Check other hosts in case service was migrated but state is stale
for host_name in cfg.hosts:
if host_name == assigned_host:
continue
if await check_service_running(cfg, service, host_name):
discovered[service] = host_name
break
return discovered
def _report_sync_changes(
added: list[str],
removed: list[str],
@@ -383,7 +349,7 @@ def sync(
current_state = load_state(cfg)
console.print("Discovering running services...")
discovered = _run_async(_discover_running_services(cfg))
discovered = _run_async(discover_running_services(cfg))
# Calculate changes
added = [s for s in discovered if s not in current_state]
@@ -420,48 +386,232 @@ def sync(
err_console.print(f"[yellow]![/] {exc}")
@app.command(rich_help_panel="Configuration")
def check(
config: ConfigOption = None,
) -> None:
"""Check for compose directories not in config (and vice versa)."""
cfg = _load_config_or_exit(config)
def _report_config_status(cfg: Config) -> bool:
"""Check and report config vs disk status. Returns True if errors found."""
configured = set(cfg.services.keys())
on_disk = cfg.discover_compose_dirs()
missing_from_config = sorted(on_disk - configured)
missing_from_disk = sorted(configured - on_disk)
if missing_from_config:
console.print(f"\n[yellow]Not in config[/] ({len(missing_from_config)}):")
console.print(f"\n[yellow]On disk but not in config[/] ({len(missing_from_config)}):")
for name in missing_from_config:
console.print(f" [yellow]+[/] [cyan]{name}[/]")
if missing_from_disk:
console.print(f"\n[red]No compose file found[/] ({len(missing_from_disk)}):")
console.print(f"\n[red]In config but no compose file[/] ({len(missing_from_disk)}):")
for name in missing_from_disk:
console.print(f" [red]-[/] [cyan]{name}[/]")
if not missing_from_config and not missing_from_disk:
console.print("[green]✓[/] All compose directories are in config.")
elif missing_from_config:
console.print(f"\n[dim]To add missing services, append to {cfg.config_path}:[/]")
for name in missing_from_config:
console.print(f"[dim] {name}: docker-debian[/]")
console.print("[green]✓[/] Config matches disk")
# Check traefik labels have matching ports
return bool(missing_from_disk)
def _report_traefik_status(cfg: Config, services: list[str]) -> None:
"""Check and report traefik label status."""
try:
_, traefik_warnings = generate_traefik_config(
cfg, list(cfg.services.keys()), check_all=True
)
if traefik_warnings:
console.print(f"\n[yellow]Traefik issues[/] ({len(traefik_warnings)}):")
for warning in traefik_warnings:
console.print(f" [yellow]![/] {warning}")
elif not missing_from_config and not missing_from_disk:
console.print("[green]✓[/] All traefik services have published ports.")
_, warnings = generate_traefik_config(cfg, services, check_all=True)
except (FileNotFoundError, ValueError):
pass # Skip traefik check if config can't be loaded
return
if warnings:
console.print(f"\n[yellow]Traefik issues[/] ({len(warnings)}):")
for warning in warnings:
console.print(f" [yellow]![/] {warning}")
else:
console.print("[green]✓[/] Traefik labels valid")
def _report_mount_errors(mount_errors: list[tuple[str, str, str]]) -> None:
"""Report mount errors grouped by service."""
by_service: dict[str, list[tuple[str, str]]] = {}
for svc, host, path in mount_errors:
by_service.setdefault(svc, []).append((host, path))
console.print(f"\n[red]Missing mounts[/] ({len(mount_errors)}):")
for svc, items in sorted(by_service.items()):
host = items[0][0]
paths = [p for _, p in items]
console.print(f" [cyan]{svc}[/] on [magenta]{host}[/]:")
for path in paths:
console.print(f" [red]✗[/] {path}")
def _report_network_errors(network_errors: list[tuple[str, str, str]]) -> None:
"""Report network errors grouped by service."""
by_service: dict[str, list[tuple[str, str]]] = {}
for svc, host, net in network_errors:
by_service.setdefault(svc, []).append((host, net))
console.print(f"\n[red]Missing networks[/] ({len(network_errors)}):")
for svc, items in sorted(by_service.items()):
host = items[0][0]
networks = [n for _, n in items]
console.print(f" [cyan]{svc}[/] on [magenta]{host}[/]:")
for net in networks:
console.print(f" [red]✗[/] {net}")
def _report_host_compatibility(
compat: dict[str, tuple[int, int, list[str]]],
current_host: str,
) -> None:
"""Report host compatibility for a service."""
for host_name, (found, total, missing) in sorted(compat.items()):
is_current = host_name == current_host
marker = " [dim](assigned)[/]" if is_current else ""
if found == total:
console.print(f" [green]✓[/] [magenta]{host_name}[/] {found}/{total}{marker}")
else:
preview = ", ".join(missing[:MISSING_PATH_PREVIEW_LIMIT])
if len(missing) > MISSING_PATH_PREVIEW_LIMIT:
preview += f", +{len(missing) - MISSING_PATH_PREVIEW_LIMIT} more"
console.print(
f" [red]✗[/] [magenta]{host_name}[/] {found}/{total} "
f"[dim](missing: {preview})[/]{marker}"
)
@app.command(rich_help_panel="Configuration")
def check(
services: ServicesArg = None,
local: Annotated[
bool,
typer.Option("--local", help="Skip SSH-based checks (faster)"),
] = False,
config: ConfigOption = None,
) -> None:
"""Validate configuration, traefik labels, mounts, and networks.
Without arguments: validates all services against configured hosts.
With service arguments: validates specific services and shows host compatibility.
Use --local to skip SSH-based checks for faster validation.
"""
cfg = _load_config_or_exit(config)
# Determine which services to check and whether to show host compatibility
if services:
svc_list = list(services)
invalid = [s for s in svc_list if s not in cfg.services]
if invalid:
for svc in invalid:
err_console.print(f"[red]✗[/] Service '{svc}' not found in config")
raise typer.Exit(1)
show_host_compat = True
else:
svc_list = list(cfg.services.keys())
show_host_compat = False
# Run checks
has_errors = _report_config_status(cfg)
_report_traefik_status(cfg, svc_list)
if not local:
console.print("\nChecking mounts and networks...")
mount_errors = _run_async(check_mounts_on_configured_hosts(cfg, svc_list))
network_errors = _run_async(check_networks_on_configured_hosts(cfg, svc_list))
if mount_errors:
_report_mount_errors(mount_errors)
has_errors = True
if network_errors:
_report_network_errors(network_errors)
has_errors = True
if not mount_errors and not network_errors:
console.print("[green]✓[/] All mounts and networks exist")
if show_host_compat:
for service in svc_list:
console.print(f"\n[bold]Host compatibility for[/] [cyan]{service}[/]:")
compat = _run_async(check_host_compatibility(cfg, service))
_report_host_compatibility(compat, cfg.services[service])
if has_errors:
raise typer.Exit(1)
# Default network settings for cross-host Docker networking
DEFAULT_NETWORK_NAME = "mynetwork"
DEFAULT_NETWORK_SUBNET = "172.20.0.0/16"
DEFAULT_NETWORK_GATEWAY = "172.20.0.1"
@app.command("init-network", rich_help_panel="Configuration")
def init_network(
hosts: Annotated[
list[str] | None,
typer.Argument(help="Hosts to create network on (default: all)"),
] = None,
network: Annotated[
str,
typer.Option("--network", "-n", help="Network name"),
] = DEFAULT_NETWORK_NAME,
subnet: Annotated[
str,
typer.Option("--subnet", "-s", help="Network subnet"),
] = DEFAULT_NETWORK_SUBNET,
gateway: Annotated[
str,
typer.Option("--gateway", "-g", help="Network gateway"),
] = DEFAULT_NETWORK_GATEWAY,
config: ConfigOption = None,
) -> None:
"""Create Docker network on hosts with consistent settings.
Creates an external Docker network that services can use for cross-host
communication. Uses the same subnet/gateway on all hosts to ensure
consistent networking.
"""
cfg = _load_config_or_exit(config)
target_hosts = list(hosts) if hosts else list(cfg.hosts.keys())
invalid = [h for h in target_hosts if h not in cfg.hosts]
if invalid:
for h in invalid:
err_console.print(f"[red]✗[/] Host '{h}' not found in config")
raise typer.Exit(1)
async def create_network_on_host(host_name: str) -> CommandResult:
host = cfg.hosts[host_name]
# Check if network already exists
check_cmd = f"docker network inspect '{network}' >/dev/null 2>&1"
check_result = await run_command(host, check_cmd, host_name, stream=False)
if check_result.success:
console.print(f"[cyan]\\[{host_name}][/] Network '{network}' already exists")
return CommandResult(service=host_name, exit_code=0, success=True)
# Create the network
create_cmd = (
f"docker network create "
f"--driver bridge "
f"--subnet '{subnet}' "
f"--gateway '{gateway}' "
f"'{network}'"
)
result = await run_command(host, create_cmd, host_name, stream=False)
if result.success:
console.print(f"[cyan]\\[{host_name}][/] [green]✓[/] Created network '{network}'")
else:
err_console.print(
f"[cyan]\\[{host_name}][/] [red]✗[/] Failed to create network: "
f"{result.stderr.strip()}"
)
return result
async def run_all() -> list[CommandResult]:
return await asyncio.gather(*[create_network_on_host(h) for h in target_hosts])
results = _run_async(run_all())
failed = [r for r in results if not r.success]
if failed:
raise typer.Exit(1)
if __name__ == "__main__":

282
src/compose_farm/compose.py Normal file
View File

@@ -0,0 +1,282 @@
"""Compose file parsing utilities.
Handles .env loading, variable interpolation, port/volume/network extraction.
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import yaml
if TYPE_CHECKING:
from pathlib import Path
from .config import Config
# Port parsing constants
SINGLE_PART = 1
PUBLISHED_TARGET_PARTS = 2
HOST_PUBLISHED_PARTS = 3
MIN_VOLUME_PARTS = 2
_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-(.*?))?\}")
@dataclass(frozen=True)
class PortMapping:
"""Port mapping for a compose service."""
target: int
published: int | None
def load_env(compose_path: Path) -> dict[str, str]:
"""Load environment variables for compose interpolation.
Reads from .env file in the same directory as compose file,
then overlays current environment variables.
"""
env: dict[str, str] = {}
env_path = compose_path.parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip()
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
env[key] = value
env.update({k: v for k, v in os.environ.items() if isinstance(v, str)})
return env
def interpolate(value: str, env: dict[str, str]) -> str:
"""Perform ${VAR} and ${VAR:-default} interpolation."""
def replace(match: re.Match[str]) -> str:
var = match.group(1)
default = match.group(2)
resolved = env.get(var)
if resolved:
return resolved
return default or ""
return _VAR_PATTERN.sub(replace, value)
def parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PLR0912
"""Parse port specifications from compose file.
Handles string formats like "8080", "8080:80", "0.0.0.0:8080:80",
and dict formats with target/published keys.
"""
if raw is None:
return []
mappings: list[PortMapping] = []
items = raw if isinstance(raw, list) else [raw]
for item in items:
if isinstance(item, str):
interpolated = interpolate(item, env)
port_spec, _, _ = interpolated.partition("/")
parts = port_spec.split(":")
published: int | None = None
target: int | None = None
if len(parts) == SINGLE_PART and parts[0].isdigit():
target = int(parts[0])
elif len(parts) == PUBLISHED_TARGET_PARTS and parts[0].isdigit() and parts[1].isdigit():
published = int(parts[0])
target = int(parts[1])
elif len(parts) == HOST_PUBLISHED_PARTS and parts[-2].isdigit() and parts[-1].isdigit():
published = int(parts[-2])
target = int(parts[-1])
if target is not None:
mappings.append(PortMapping(target=target, published=published))
elif isinstance(item, dict):
target_raw = item.get("target")
if isinstance(target_raw, str):
target_raw = interpolate(target_raw, env)
if target_raw is None:
continue
try:
target_val = int(str(target_raw))
except (TypeError, ValueError):
continue
published_raw = item.get("published")
if isinstance(published_raw, str):
published_raw = interpolate(published_raw, env)
published_val: int | None
try:
published_val = int(str(published_raw)) if published_raw is not None else None
except (TypeError, ValueError):
published_val = None
mappings.append(PortMapping(target=target_val, published=published_val))
return mappings
def _resolve_host_path(host_path: str, compose_dir: Path) -> str | None:
"""Resolve a host path from volume mount, returning None for named volumes."""
if host_path.startswith("/"):
return host_path
if host_path.startswith(("./", "../")):
return str((compose_dir / host_path).resolve())
return None # Named volume
def _parse_volume_item(
item: str | dict[str, Any],
env: dict[str, str],
compose_dir: Path,
) -> str | None:
"""Parse a single volume item and return host path if it's a bind mount."""
if isinstance(item, str):
interpolated = interpolate(item, env)
parts = interpolated.split(":")
if len(parts) >= MIN_VOLUME_PARTS:
return _resolve_host_path(parts[0], compose_dir)
elif isinstance(item, dict) and item.get("type") == "bind":
source = item.get("source")
if source:
interpolated = interpolate(str(source), env)
return _resolve_host_path(interpolated, compose_dir)
return None
def parse_host_volumes(config: Config, service: str) -> list[str]:
"""Extract host bind mount paths from a service's compose file.
Returns a list of absolute host paths used as volume mounts.
Skips named volumes and resolves relative paths.
"""
compose_path = config.get_compose_path(service)
if not compose_path.exists():
return []
env = load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
return []
paths: list[str] = []
compose_dir = compose_path.parent
for definition in raw_services.values():
if not isinstance(definition, dict):
continue
volumes = definition.get("volumes")
if not volumes:
continue
items = volumes if isinstance(volumes, list) else [volumes]
for item in items:
host_path = _parse_volume_item(item, env, compose_dir)
if host_path:
paths.append(host_path)
# Return unique paths, preserving order
seen: set[str] = set()
unique: list[str] = []
for p in paths:
if p not in seen:
seen.add(p)
unique.append(p)
return unique
def parse_external_networks(config: Config, service: str) -> list[str]:
"""Extract external network names from a service's compose file.
Returns a list of network names marked as external: true.
"""
compose_path = config.get_compose_path(service)
if not compose_path.exists():
return []
compose_data = yaml.safe_load(compose_path.read_text()) or {}
networks = compose_data.get("networks", {})
if not isinstance(networks, dict):
return []
external_networks: list[str] = []
for name, definition in networks.items():
if isinstance(definition, dict) and definition.get("external") is True:
external_networks.append(name)
return external_networks
def load_compose_services(
config: Config,
stack: str,
) -> tuple[dict[str, Any], dict[str, str], str]:
"""Load services from a compose file with environment interpolation.
Returns (services_dict, env_dict, host_address).
"""
compose_path = config.get_compose_path(stack)
if not compose_path.exists():
message = f"[{stack}] Compose file not found: {compose_path}"
raise FileNotFoundError(message)
env = load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
return {}, env, config.get_host(stack).address
return raw_services, env, config.get_host(stack).address
def normalize_labels(raw: Any, env: dict[str, str]) -> dict[str, str]:
"""Normalize labels from list or dict format, with interpolation."""
if raw is None:
return {}
if isinstance(raw, dict):
return {
interpolate(str(k), env): interpolate(str(v), env)
for k, v in raw.items()
if k is not None
}
if isinstance(raw, list):
labels: dict[str, str] = {}
for item in raw:
if not isinstance(item, str) or "=" not in item:
continue
key_raw, value_raw = item.split("=", 1)
key = interpolate(key_raw.strip(), env)
value = interpolate(value_raw.strip(), env)
labels[key] = value
return labels
return {}
def get_ports_for_service(
definition: dict[str, Any],
all_services: dict[str, Any],
env: dict[str, str],
) -> list[PortMapping]:
"""Get ports for a service, following network_mode: service:X if present."""
network_mode = definition.get("network_mode", "")
if isinstance(network_mode, str) and network_mode.startswith("service:"):
# Service uses another service's network - get ports from that service
ref_service = network_mode[len("service:") :]
if ref_service in all_services:
ref_def = all_services[ref_service]
if isinstance(ref_def, dict):
return parse_ports(ref_def.get("ports"), env)
return parse_ports(definition.get("ports"), env)

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
import socket
import subprocess
from dataclasses import dataclass
from functools import lru_cache
from typing import TYPE_CHECKING, Any
@@ -19,6 +20,7 @@ _console = Console(highlight=False)
_err_console = Console(stderr=True, highlight=False)
LOCAL_ADDRESSES = frozenset({"local", "localhost", "127.0.0.1", "::1"})
_DEFAULT_SSH_PORT = 22
@lru_cache(maxsize=1)
@@ -29,7 +31,9 @@ def _get_local_ips() -> frozenset[str]:
hostname = socket.gethostname()
# Get all addresses for hostname
for info in socket.getaddrinfo(hostname, None):
ips.add(info[4][0])
addr = info[4][0]
if isinstance(addr, str):
ips.add(addr)
# Also try getting the default outbound IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
@@ -64,9 +68,24 @@ async def _run_local_command(
service: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a command locally with streaming output."""
try:
if raw:
# Run with inherited stdout/stderr for proper \r handling
proc = await asyncio.create_subprocess_shell(
command,
stdout=None, # Inherit
stderr=None, # Inherit
)
await proc.wait()
return CommandResult(
service=service,
exit_code=proc.returncode or 0,
success=proc.returncode == 0,
)
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
@@ -120,51 +139,64 @@ async def _run_ssh_command(
service: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a command on a remote host via SSH with streaming output."""
if raw:
# Use native ssh with TTY for proper progress bar rendering
ssh_args = ["ssh", "-t"]
if host.port != _DEFAULT_SSH_PORT:
ssh_args.extend(["-p", str(host.port)])
ssh_args.extend([f"{host.user}@{host.address}", command])
# Run in thread to avoid blocking the event loop
result = await asyncio.to_thread(subprocess.run, ssh_args, check=False)
return CommandResult(
service=service,
exit_code=result.returncode,
success=result.returncode == 0,
)
proc: asyncssh.SSHClientProcess[Any]
try:
async with (
asyncssh.connect(
host.address,
port=host.port,
username=host.user,
known_hosts=None,
) as conn,
conn.create_process(command) as proc,
):
if stream:
async with asyncssh.connect( # noqa: SIM117 - conn needed before create_process
host.address,
port=host.port,
username=host.user,
known_hosts=None,
) as conn:
async with conn.create_process(command) as proc:
if stream:
async def read_stream(
reader: Any,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
console = _err_console if is_stderr else _console
async for line in reader:
if line.strip(): # Skip empty lines
console.print(f"[cyan]\\[{prefix}][/] {escape(line)}", end="")
async def read_stream(
reader: Any,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
console = _err_console if is_stderr else _console
async for line in reader:
if line.strip(): # Skip empty lines
console.print(f"[cyan]\\[{prefix}][/] {escape(line)}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
read_stream(proc.stderr, service, is_stderr=True),
await asyncio.gather(
read_stream(proc.stdout, service),
read_stream(proc.stderr, service, is_stderr=True),
)
stdout_data = ""
stderr_data = ""
if not stream:
stdout_data = await proc.stdout.read()
stderr_data = await proc.stderr.read()
await proc.wait()
return CommandResult(
service=service,
exit_code=proc.exit_status or 0,
success=proc.exit_status == 0,
stdout=stdout_data,
stderr=stderr_data,
)
stdout_data = ""
stderr_data = ""
if not stream:
stdout_data = await proc.stdout.read()
stderr_data = await proc.stderr.read()
await proc.wait()
return CommandResult(
service=service,
exit_code=proc.exit_status or 0,
success=proc.exit_status == 0,
stdout=stdout_data,
stderr=stderr_data,
)
except (OSError, asyncssh.Error) as e:
_err_console.print(f"[cyan]\\[{service}][/] [red]SSH error:[/] {e}")
return CommandResult(service=service, exit_code=1, success=False)
@@ -176,11 +208,12 @@ async def run_command(
service: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a command on a host (locally or via SSH)."""
if _is_local(host):
return await _run_local_command(command, service, stream=stream)
return await _run_ssh_command(host, command, service, stream=stream)
return await _run_local_command(command, service, stream=stream, raw=raw)
return await _run_ssh_command(host, command, service, stream=stream, raw=raw)
async def run_compose(
@@ -189,13 +222,14 @@ async def run_compose(
compose_cmd: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a docker compose command for a service."""
host = config.get_host(service)
compose_path = config.get_compose_path(service)
command = f"docker compose -f {compose_path} {compose_cmd}"
return await run_command(host, command, service, stream=stream)
return await run_command(host, command, service, stream=stream, raw=raw)
async def run_compose_on_host(
@@ -205,6 +239,7 @@ async def run_compose_on_host(
compose_cmd: str,
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run a docker compose command for a service on a specific host.
@@ -214,7 +249,7 @@ async def run_compose_on_host(
compose_path = config.get_compose_path(service)
command = f"docker compose -f {compose_path} {compose_cmd}"
return await run_command(host, command, service, stream=stream)
return await run_command(host, command, service, stream=stream, raw=raw)
async def run_on_services(
@@ -223,9 +258,15 @@ async def run_on_services(
compose_cmd: str,
*,
stream: bool = True,
raw: bool = False,
) -> list[CommandResult]:
"""Run a docker compose command on multiple services in parallel."""
tasks = [run_compose(config, service, compose_cmd, stream=stream) for service in services]
"""Run a docker compose command on multiple services in parallel.
Note: raw=True only makes sense for single-service operations.
"""
tasks = [
run_compose(config, service, compose_cmd, stream=stream, raw=raw) for service in services
]
return await asyncio.gather(*tasks)
@@ -235,10 +276,11 @@ async def run_sequential_commands(
commands: list[str],
*,
stream: bool = True,
raw: bool = False,
) -> CommandResult:
"""Run multiple compose commands sequentially for a service."""
for cmd in commands:
result = await run_compose(config, service, cmd, stream=stream)
result = await run_compose(config, service, cmd, stream=stream, raw=raw)
if not result.success:
return result
return CommandResult(service=service, exit_code=0, success=True)
@@ -250,10 +292,15 @@ async def run_sequential_on_services(
commands: list[str],
*,
stream: bool = True,
raw: bool = False,
) -> list[CommandResult]:
"""Run sequential commands on multiple services in parallel."""
"""Run sequential commands on multiple services in parallel.
Note: raw=True only makes sense for single-service operations.
"""
tasks = [
run_sequential_commands(config, service, commands, stream=stream) for service in services
run_sequential_commands(config, service, commands, stream=stream, raw=raw)
for service in services
]
return await asyncio.gather(*tasks)
@@ -273,3 +320,76 @@ async def check_service_running(
# If command succeeded and has output, containers are running
return result.success and bool(result.stdout.strip())
async def check_paths_exist(
config: Config,
host_name: str,
paths: list[str],
) -> dict[str, bool]:
"""Check if multiple paths exist on a specific host.
Returns a dict mapping path -> exists.
"""
if not paths:
return {}
host = config.hosts[host_name]
# Build a command that checks all paths efficiently
# Using a subshell to check each path and report Y/N
checks = []
for p in paths:
# Escape single quotes in path
escaped = p.replace("'", "'\\''")
checks.append(f"test -e '{escaped}' && echo 'Y:{escaped}' || echo 'N:{escaped}'")
command = "; ".join(checks)
result = await run_command(host, command, "mount-check", stream=False)
exists: dict[str, bool] = dict.fromkeys(paths, False)
for raw_line in result.stdout.splitlines():
line = raw_line.strip()
if line.startswith("Y:"):
exists[line[2:]] = True
elif line.startswith("N:"):
exists[line[2:]] = False
return exists
async def check_networks_exist(
config: Config,
host_name: str,
networks: list[str],
) -> dict[str, bool]:
"""Check if Docker networks exist on a specific host.
Returns a dict mapping network_name -> exists.
"""
if not networks:
return {}
host = config.hosts[host_name]
# Check each network via docker network inspect
checks = []
for net in networks:
escaped = net.replace("'", "'\\''")
checks.append(
f"docker network inspect '{escaped}' >/dev/null 2>&1 "
f"&& echo 'Y:{escaped}' || echo 'N:{escaped}'"
)
command = "; ".join(checks)
result = await run_command(host, command, "network-check", stream=False)
exists: dict[str, bool] = dict.fromkeys(networks, False)
for raw_line in result.stdout.splitlines():
line = raw_line.strip()
if line.startswith("Y:"):
exists[line[2:]] = True
elif line.startswith("N:"):
exists[line[2:]] = False
return exists

View File

@@ -9,13 +9,13 @@ from datetime import UTC, datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
from .ssh import run_compose
from .executor import run_compose
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable
from .config import Config
from .ssh import CommandResult
from .executor import CommandResult
DEFAULT_LOG_PATH = Path.home() / ".config" / "compose-farm" / "dockerfarm-log.toml"

View File

@@ -0,0 +1,234 @@
"""High-level operations for compose-farm.
Contains the business logic for up, down, sync, check, and migration operations.
CLI commands are thin wrappers around these functions.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from rich.console import Console
from .compose import parse_external_networks, parse_host_volumes
from .executor import (
CommandResult,
check_networks_exist,
check_paths_exist,
check_service_running,
run_compose,
run_compose_on_host,
)
from .state import get_service_host, set_service_host
if TYPE_CHECKING:
from .config import Config
console = Console(highlight=False)
err_console = Console(stderr=True, highlight=False)
def get_service_paths(cfg: Config, service: str) -> list[str]:
"""Get all required paths for a service (compose_dir + volumes)."""
paths = [str(cfg.compose_dir)]
paths.extend(parse_host_volumes(cfg, service))
return paths
async def check_mounts_for_migration(
cfg: Config,
service: str,
target_host: str,
) -> list[str]:
"""Check if mount paths exist on target host. Returns list of missing paths."""
paths = get_service_paths(cfg, service)
exists = await check_paths_exist(cfg, target_host, paths)
return [p for p, found in exists.items() if not found]
async def check_networks_for_migration(
cfg: Config,
service: str,
target_host: str,
) -> list[str]:
"""Check if Docker networks exist on target host. Returns list of missing networks."""
networks = parse_external_networks(cfg, service)
if not networks:
return []
exists = await check_networks_exist(cfg, target_host, networks)
return [n for n, found in exists.items() if not found]
async def preflight_check(
cfg: Config,
service: str,
target_host: str,
) -> tuple[list[str], list[str]]:
"""Run pre-flight checks for a service on target host.
Returns (missing_paths, missing_networks).
"""
missing_paths = await check_mounts_for_migration(cfg, service, target_host)
missing_networks = await check_networks_for_migration(cfg, service, target_host)
return missing_paths, missing_networks
def report_preflight_failures(
service: str,
target_host: str,
missing_paths: list[str],
missing_networks: list[str],
) -> None:
"""Report pre-flight check failures."""
err_console.print(
f"[cyan]\\[{service}][/] [red]✗[/] Cannot start on [magenta]{target_host}[/]:"
)
for path in missing_paths:
err_console.print(f" [red]✗[/] missing path: {path}")
for net in missing_networks:
err_console.print(f" [red]✗[/] missing network: {net}")
async def up_services(
cfg: Config,
services: list[str],
*,
raw: bool = False,
) -> list[CommandResult]:
"""Start services with automatic migration if host changed."""
results: list[CommandResult] = []
total = len(services)
for idx, service in enumerate(services, 1):
prefix = f"[dim][{idx}/{total}][/] [cyan]\\[{service}][/]"
target_host = cfg.services[service]
current_host = get_service_host(cfg, service)
# Pre-flight check: verify paths and networks exist on target
missing_paths, missing_networks = await preflight_check(cfg, service, target_host)
if missing_paths or missing_networks:
report_preflight_failures(service, target_host, missing_paths, missing_networks)
results.append(CommandResult(service=service, exit_code=1, success=False))
continue
# If service is deployed elsewhere, migrate it
if current_host and current_host != target_host:
if current_host in cfg.hosts:
console.print(
f"{prefix} Migrating from "
f"[magenta]{current_host}[/] → [magenta]{target_host}[/]..."
)
down_result = await run_compose_on_host(cfg, service, current_host, "down", raw=raw)
if raw:
print() # Ensure newline after raw output
if not down_result.success:
results.append(down_result)
continue
else:
err_console.print(
f"{prefix} [yellow]![/] was on "
f"[magenta]{current_host}[/] (not in config), skipping down"
)
# Start on target host
console.print(f"{prefix} Starting on [magenta]{target_host}[/]...")
up_result = await run_compose(cfg, service, "up -d", raw=raw)
if raw:
print() # Ensure newline after raw output (progress bars end with \r)
results.append(up_result)
# Update state on success
if up_result.success:
set_service_host(cfg, service, target_host)
return results
async def discover_running_services(cfg: Config) -> dict[str, str]:
"""Discover which services are running on which hosts.
Returns a dict mapping service names to host names for running services.
"""
discovered: dict[str, str] = {}
for service, assigned_host in cfg.services.items():
# Check assigned host first (most common case)
if await check_service_running(cfg, service, assigned_host):
discovered[service] = assigned_host
continue
# Check other hosts in case service was migrated but state is stale
for host_name in cfg.hosts:
if host_name == assigned_host:
continue
if await check_service_running(cfg, service, host_name):
discovered[service] = host_name
break
return discovered
async def check_host_compatibility(
cfg: Config,
service: str,
) -> dict[str, tuple[int, int, list[str]]]:
"""Check which hosts can run a service based on mount paths.
Returns dict of host_name -> (found_count, total_count, missing_paths).
"""
paths = get_service_paths(cfg, service)
results: dict[str, tuple[int, int, list[str]]] = {}
for host_name in cfg.hosts:
exists = await check_paths_exist(cfg, host_name, paths)
found = sum(1 for v in exists.values() if v)
missing = [p for p, v in exists.items() if not v]
results[host_name] = (found, len(paths), missing)
return results
async def check_mounts_on_configured_hosts(
cfg: Config,
services: list[str],
) -> list[tuple[str, str, str]]:
"""Check mount paths exist on configured hosts.
Returns list of (service, host, missing_path) tuples.
"""
missing: list[tuple[str, str, str]] = []
for service in services:
host_name = cfg.services[service]
paths = get_service_paths(cfg, service)
exists = await check_paths_exist(cfg, host_name, paths)
for path, found in exists.items():
if not found:
missing.append((service, host_name, path))
return missing
async def check_networks_on_configured_hosts(
cfg: Config,
services: list[str],
) -> list[tuple[str, str, str]]:
"""Check Docker networks exist on configured hosts.
Returns list of (service, host, missing_network) tuples.
"""
missing: list[tuple[str, str, str]] = []
for service in services:
host_name = cfg.services[service]
networks = parse_external_networks(cfg, service)
if not networks:
continue
exists = await check_networks_exist(cfg, host_name, networks)
for net, found in exists.items():
if not found:
missing.append((service, host_name, net))
return missing

View File

@@ -51,3 +51,14 @@ def remove_service(config: Config, service: str) -> None:
state = load_state(config)
state.pop(service, None)
save_state(config, state)
def get_services_needing_migration(config: Config) -> list[str]:
"""Get services where current host differs from configured host."""
state = load_state(config)
needs_migration = []
for service, configured_host in config.services.items():
current_host = state.get(service)
if current_host and current_host != configured_host:
needs_migration.append(service)
return needs_migration

View File

@@ -8,29 +8,21 @@ use host-published ports for cross-host reachability.
from __future__ import annotations
import os
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import yaml
from .ssh import LOCAL_ADDRESSES
from .compose import (
PortMapping,
get_ports_for_service,
load_compose_services,
normalize_labels,
)
from .executor import LOCAL_ADDRESSES
if TYPE_CHECKING:
from pathlib import Path
from .config import Config
@dataclass(frozen=True)
class PortMapping:
"""Port mapping for a compose service."""
target: int
published: int | None
@dataclass
class TraefikServiceSource:
"""Source information to build an upstream for a Traefik service."""
@@ -45,119 +37,8 @@ class TraefikServiceSource:
LIST_VALUE_KEYS = {"entrypoints", "middlewares"}
SINGLE_PART = 1
PUBLISHED_TARGET_PARTS = 2
HOST_PUBLISHED_PARTS = 3
MIN_ROUTER_PARTS = 3
MIN_SERVICE_LABEL_PARTS = 6
_VAR_PATTERN = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::-(.*?))?\}")
def _load_env(compose_path: Path) -> dict[str, str]:
"""Load environment variables for compose interpolation."""
env: dict[str, str] = {}
env_path = compose_path.parent / ".env"
if env_path.exists():
for line in env_path.read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#") or "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
value = value.strip()
if (value.startswith('"') and value.endswith('"')) or (
value.startswith("'") and value.endswith("'")
):
value = value[1:-1]
env[key] = value
env.update({k: v for k, v in os.environ.items() if isinstance(v, str)})
return env
def _interpolate(value: str, env: dict[str, str]) -> str:
"""Perform a minimal `${VAR}`/`${VAR:-default}` interpolation."""
def replace(match: re.Match[str]) -> str:
var = match.group(1)
default = match.group(2)
resolved = env.get(var)
if resolved:
return resolved
return default or ""
return _VAR_PATTERN.sub(replace, value)
def _normalize_labels(raw: Any, env: dict[str, str]) -> dict[str, str]:
if raw is None:
return {}
if isinstance(raw, dict):
return {
_interpolate(str(k), env): _interpolate(str(v), env)
for k, v in raw.items()
if k is not None
}
if isinstance(raw, list):
labels: dict[str, str] = {}
for item in raw:
if not isinstance(item, str) or "=" not in item:
continue
key_raw, value_raw = item.split("=", 1)
key = _interpolate(key_raw.strip(), env)
value = _interpolate(value_raw.strip(), env)
labels[key] = value
return labels
return {}
def _parse_ports(raw: Any, env: dict[str, str]) -> list[PortMapping]: # noqa: PLR0912
if raw is None:
return []
mappings: list[PortMapping] = []
items = raw if isinstance(raw, list) else [raw]
for item in items:
if isinstance(item, str):
interpolated = _interpolate(item, env)
port_spec, _, _ = interpolated.partition("/")
parts = port_spec.split(":")
published: int | None = None
target: int | None = None
if len(parts) == SINGLE_PART and parts[0].isdigit():
target = int(parts[0])
elif len(parts) == PUBLISHED_TARGET_PARTS and parts[0].isdigit() and parts[1].isdigit():
published = int(parts[0])
target = int(parts[1])
elif len(parts) == HOST_PUBLISHED_PARTS and parts[-2].isdigit() and parts[-1].isdigit():
published = int(parts[-2])
target = int(parts[-1])
if target is not None:
mappings.append(PortMapping(target=target, published=published))
elif isinstance(item, dict):
target_raw = item.get("target")
if isinstance(target_raw, str):
target_raw = _interpolate(target_raw, env)
if target_raw is None:
continue
try:
target_val = int(str(target_raw))
except (TypeError, ValueError):
continue
published_raw = item.get("published")
if isinstance(published_raw, str):
published_raw = _interpolate(published_raw, env)
published_val: int | None
try:
published_val = int(str(published_raw)) if published_raw is not None else None
except (TypeError, ValueError):
published_val = None
mappings.append(PortMapping(target=target_val, published=published_val))
return mappings
def _parse_value(key: str, raw_value: str) -> Any:
@@ -255,20 +136,6 @@ def _resolve_published_port(source: TraefikServiceSource) -> tuple[int | None, s
)
def _load_stack(config: Config, stack: str) -> tuple[dict[str, Any], dict[str, str], str]:
compose_path = config.get_compose_path(stack)
if not compose_path.exists():
message = f"[{stack}] Compose file not found: {compose_path}"
raise FileNotFoundError(message)
env = _load_env(compose_path)
compose_data = yaml.safe_load(compose_path.read_text()) or {}
raw_services = compose_data.get("services", {})
if not isinstance(raw_services, dict):
return {}, env, config.get_host(stack).address
return raw_services, env, config.get_host(stack).address
def _finalize_http_services(
dynamic: dict[str, Any],
sources: dict[str, TraefikServiceSource],
@@ -390,23 +257,6 @@ def _process_service_label(
source.scheme = str(_parse_value(key_without_prefix, label_value))
def _get_ports_for_service(
definition: dict[str, Any],
all_services: dict[str, Any],
env: dict[str, str],
) -> list[PortMapping]:
"""Get ports for a service, following network_mode: service:X if present."""
network_mode = definition.get("network_mode", "")
if isinstance(network_mode, str) and network_mode.startswith("service:"):
# Service uses another service's network - get ports from that service
ref_service = network_mode[len("service:") :]
if ref_service in all_services:
ref_def = all_services[ref_service]
if isinstance(ref_def, dict):
return _parse_ports(ref_def.get("ports"), env)
return _parse_ports(definition.get("ports"), env)
def _process_service_labels(
stack: str,
compose_service: str,
@@ -418,14 +268,14 @@ def _process_service_labels(
sources: dict[str, TraefikServiceSource],
warnings: list[str],
) -> None:
labels = _normalize_labels(definition.get("labels"), env)
labels = normalize_labels(definition.get("labels"), env)
if not labels:
return
enable_raw = labels.get("traefik.enable")
if enable_raw is not None and _parse_value("enable", enable_raw) is False:
return
ports = _get_ports_for_service(definition, all_services, env)
ports = get_ports_for_service(definition, all_services, env)
routers: dict[str, bool] = {}
service_names: set[str] = set()
@@ -484,7 +334,7 @@ def generate_traefik_config(
traefik_host = config.services.get(config.traefik_service)
for stack in services:
raw_services, env, host_address = _load_stack(config, stack)
raw_services, env, host_address = load_compose_services(config, stack)
stack_host = config.services.get(stack)
# Skip services on Traefik's host - docker provider handles them directly

241
tests/test_executor.py Normal file
View File

@@ -0,0 +1,241 @@
"""Tests for executor module."""
import sys
from pathlib import Path
import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import (
CommandResult,
_is_local,
_run_local_command,
check_networks_exist,
check_paths_exist,
run_command,
run_compose,
run_on_services,
)
# These tests run actual shell commands that only work on Linux
linux_only = pytest.mark.skipif(sys.platform != "linux", reason="Linux-only shell commands")
class TestIsLocal:
"""Tests for _is_local function."""
@pytest.mark.parametrize(
"address",
["local", "localhost", "127.0.0.1", "::1", "LOCAL", "LOCALHOST"],
)
def test_local_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is True
@pytest.mark.parametrize(
"address",
["192.168.1.10", "nas01.local", "10.0.0.1", "example.com"],
)
def test_remote_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is False
class TestRunLocalCommand:
"""Tests for local command execution."""
async def test_run_local_command_success(self) -> None:
result = await _run_local_command("echo hello", "test-service")
assert result.success is True
assert result.exit_code == 0
assert result.service == "test-service"
async def test_run_local_command_failure(self) -> None:
result = await _run_local_command("exit 1", "test-service")
assert result.success is False
assert result.exit_code == 1
async def test_run_local_command_not_found(self) -> None:
result = await _run_local_command("nonexistent_command_xyz", "test-service")
assert result.success is False
assert result.exit_code != 0
async def test_run_local_command_captures_output(self) -> None:
result = await _run_local_command("echo hello", "test-service", stream=False)
assert "hello" in result.stdout
class TestRunCommand:
"""Tests for run_command dispatcher."""
async def test_run_command_local(self) -> None:
host = Host(address="localhost")
result = await run_command(host, "echo test", "test-service")
assert result.success is True
async def test_run_command_result_structure(self) -> None:
host = Host(address="local")
result = await run_command(host, "true", "my-service")
assert isinstance(result, CommandResult)
assert result.service == "my-service"
assert result.exit_code == 0
assert result.success is True
class TestRunCompose:
"""Tests for compose command execution."""
async def test_run_compose_builds_correct_command(self, tmp_path: Path) -> None:
# Create a minimal compose file
compose_dir = tmp_path / "compose"
service_dir = compose_dir / "test-service"
service_dir.mkdir(parents=True)
compose_file = service_dir / "docker-compose.yml"
compose_file.write_text("services: {}")
config = Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
services={"test-service": "local"},
)
# This will fail because docker compose isn't running,
# but we can verify the command structure works
result = await run_compose(config, "test-service", "config", stream=False)
# Command may fail due to no docker, but structure is correct
assert result.service == "test-service"
class TestRunOnServices:
"""Tests for parallel service execution."""
async def test_run_on_services_parallel(self) -> None:
config = Config(
compose_dir=Path("/tmp"),
hosts={"local": Host(address="localhost")},
services={"svc1": "local", "svc2": "local"},
)
# Use a simple command that will work without docker
# We'll test the parallelism structure
results = await run_on_services(config, ["svc1", "svc2"], "version", stream=False)
assert len(results) == 2
assert results[0].service == "svc1"
assert results[1].service == "svc2"
@linux_only
class TestCheckPathsExist:
"""Tests for check_paths_exist function (uses 'test -e' shell command)."""
async def test_check_existing_paths(self, tmp_path: Path) -> None:
"""Check paths that exist."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
# Create test paths
(tmp_path / "dir1").mkdir()
(tmp_path / "file1").touch()
result = await check_paths_exist(
config, "local", [str(tmp_path / "dir1"), str(tmp_path / "file1")]
)
assert result[str(tmp_path / "dir1")] is True
assert result[str(tmp_path / "file1")] is True
async def test_check_missing_paths(self, tmp_path: Path) -> None:
"""Check paths that don't exist."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_paths_exist(
config, "local", [str(tmp_path / "missing1"), str(tmp_path / "missing2")]
)
assert result[str(tmp_path / "missing1")] is False
assert result[str(tmp_path / "missing2")] is False
async def test_check_mixed_paths(self, tmp_path: Path) -> None:
"""Check mix of existing and missing paths."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
(tmp_path / "exists").mkdir()
result = await check_paths_exist(
config, "local", [str(tmp_path / "exists"), str(tmp_path / "missing")]
)
assert result[str(tmp_path / "exists")] is True
assert result[str(tmp_path / "missing")] is False
async def test_check_empty_paths(self, tmp_path: Path) -> None:
"""Empty path list returns empty dict."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_paths_exist(config, "local", [])
assert result == {}
@linux_only
class TestCheckNetworksExist:
"""Tests for check_networks_exist function (requires Docker)."""
async def test_check_bridge_network_exists(self, tmp_path: Path) -> None:
"""The 'bridge' network always exists on Docker hosts."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_networks_exist(config, "local", ["bridge"])
assert result["bridge"] is True
async def test_check_nonexistent_network(self, tmp_path: Path) -> None:
"""Check a network that doesn't exist."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_networks_exist(config, "local", ["nonexistent_network_xyz_123"])
assert result["nonexistent_network_xyz_123"] is False
async def test_check_mixed_networks(self, tmp_path: Path) -> None:
"""Check mix of existing and non-existing networks."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_networks_exist(
config, "local", ["bridge", "nonexistent_network_xyz_123"]
)
assert result["bridge"] is True
assert result["nonexistent_network_xyz_123"] is False
async def test_check_empty_networks(self, tmp_path: Path) -> None:
"""Empty network list returns empty dict."""
config = Config(
compose_dir=tmp_path,
hosts={"local": Host(address="localhost")},
services={},
)
result = await check_networks_exist(config, "local", [])
assert result == {}

View File

@@ -8,8 +8,8 @@ from pathlib import Path
import pytest
from compose_farm.config import Config, Host
from compose_farm.executor import CommandResult
from compose_farm.logs import _parse_images_output, snapshot_services
from compose_farm.ssh import CommandResult
def test_parse_images_output_handles_list_and_lines() -> None:

View File

@@ -1,118 +0,0 @@
"""Tests for ssh module."""
from pathlib import Path
import pytest
from compose_farm.config import Config, Host
from compose_farm.ssh import (
CommandResult,
_is_local,
_run_local_command,
run_command,
run_compose,
run_on_services,
)
class TestIsLocal:
"""Tests for _is_local function."""
@pytest.mark.parametrize(
"address",
["local", "localhost", "127.0.0.1", "::1", "LOCAL", "LOCALHOST"],
)
def test_local_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is True
@pytest.mark.parametrize(
"address",
["192.168.1.10", "nas01.local", "10.0.0.1", "example.com"],
)
def test_remote_addresses(self, address: str) -> None:
host = Host(address=address)
assert _is_local(host) is False
class TestRunLocalCommand:
"""Tests for local command execution."""
async def test_run_local_command_success(self) -> None:
result = await _run_local_command("echo hello", "test-service")
assert result.success is True
assert result.exit_code == 0
assert result.service == "test-service"
async def test_run_local_command_failure(self) -> None:
result = await _run_local_command("exit 1", "test-service")
assert result.success is False
assert result.exit_code == 1
async def test_run_local_command_not_found(self) -> None:
result = await _run_local_command("nonexistent_command_xyz", "test-service")
assert result.success is False
assert result.exit_code != 0
async def test_run_local_command_captures_output(self) -> None:
result = await _run_local_command("echo hello", "test-service", stream=False)
assert "hello" in result.stdout
class TestRunCommand:
"""Tests for run_command dispatcher."""
async def test_run_command_local(self) -> None:
host = Host(address="localhost")
result = await run_command(host, "echo test", "test-service")
assert result.success is True
async def test_run_command_result_structure(self) -> None:
host = Host(address="local")
result = await run_command(host, "true", "my-service")
assert isinstance(result, CommandResult)
assert result.service == "my-service"
assert result.exit_code == 0
assert result.success is True
class TestRunCompose:
"""Tests for compose command execution."""
async def test_run_compose_builds_correct_command(self, tmp_path: Path) -> None:
# Create a minimal compose file
compose_dir = tmp_path / "compose"
service_dir = compose_dir / "test-service"
service_dir.mkdir(parents=True)
compose_file = service_dir / "docker-compose.yml"
compose_file.write_text("services: {}")
config = Config(
compose_dir=compose_dir,
hosts={"local": Host(address="localhost")},
services={"test-service": "local"},
)
# This will fail because docker compose isn't running,
# but we can verify the command structure works
result = await run_compose(config, "test-service", "config", stream=False)
# Command may fail due to no docker, but structure is correct
assert result.service == "test-service"
class TestRunOnServices:
"""Tests for parallel service execution."""
async def test_run_on_services_parallel(self) -> None:
config = Config(
compose_dir=Path("/tmp"),
hosts={"local": Host(address="localhost")},
services={"svc1": "local", "svc2": "local"},
)
# Use a simple command that will work without docker
# We'll test the parallelism structure
results = await run_on_services(config, ["svc1", "svc2"], "version", stream=False)
assert len(results) == 2
assert results[0].service == "svc1"
assert results[1].service == "svc2"

View File

@@ -7,10 +7,11 @@ from unittest.mock import AsyncMock, patch
import pytest
from compose_farm import cli as cli_module
from compose_farm import ssh as ssh_module
from compose_farm import executor as executor_module
from compose_farm import operations as operations_module
from compose_farm import state as state_module
from compose_farm.config import Config, Host
from compose_farm.ssh import CommandResult, check_service_running
from compose_farm.executor import CommandResult, check_service_running
@pytest.fixture
@@ -58,7 +59,7 @@ class TestCheckServiceRunning:
@pytest.mark.asyncio
async def test_service_running(self, mock_config: Config) -> None:
"""Returns True when service has running containers."""
with patch.object(ssh_module, "run_command", new_callable=AsyncMock) as mock_run:
with patch.object(executor_module, "run_command", new_callable=AsyncMock) as mock_run:
mock_run.return_value = CommandResult(
service="plex",
exit_code=0,
@@ -71,7 +72,7 @@ class TestCheckServiceRunning:
@pytest.mark.asyncio
async def test_service_not_running(self, mock_config: Config) -> None:
"""Returns False when service has no running containers."""
with patch.object(ssh_module, "run_command", new_callable=AsyncMock) as mock_run:
with patch.object(executor_module, "run_command", new_callable=AsyncMock) as mock_run:
mock_run.return_value = CommandResult(
service="plex",
exit_code=0,
@@ -84,7 +85,7 @@ class TestCheckServiceRunning:
@pytest.mark.asyncio
async def test_command_failed(self, mock_config: Config) -> None:
"""Returns False when command fails."""
with patch.object(ssh_module, "run_command", new_callable=AsyncMock) as mock_run:
with patch.object(executor_module, "run_command", new_callable=AsyncMock) as mock_run:
mock_run.return_value = CommandResult(
service="plex",
exit_code=1,
@@ -95,13 +96,13 @@ class TestCheckServiceRunning:
class TestDiscoverRunningServices:
"""Tests for _discover_running_services function."""
"""Tests for discover_running_services function."""
@pytest.mark.asyncio
async def test_discovers_on_assigned_host(self, mock_config: Config) -> None:
"""Discovers service running on its assigned host."""
with patch.object(
cli_module, "check_service_running", new_callable=AsyncMock
operations_module, "check_service_running", new_callable=AsyncMock
) as mock_check:
# plex running on nas01, jellyfin not running, sonarr on nas02
async def check_side_effect(_cfg: Any, service: str, host: str) -> bool:
@@ -111,14 +112,14 @@ class TestDiscoverRunningServices:
mock_check.side_effect = check_side_effect
result = await cli_module._discover_running_services(mock_config)
result = await operations_module.discover_running_services(mock_config)
assert result == {"plex": "nas01", "sonarr": "nas02"}
@pytest.mark.asyncio
async def test_discovers_on_different_host(self, mock_config: Config) -> None:
"""Discovers service running on non-assigned host (after migration)."""
with patch.object(
cli_module, "check_service_running", new_callable=AsyncMock
operations_module, "check_service_running", new_callable=AsyncMock
) as mock_check:
# plex migrated to nas02
async def check_side_effect(_cfg: Any, service: str, host: str) -> bool:
@@ -126,7 +127,7 @@ class TestDiscoverRunningServices:
mock_check.side_effect = check_side_effect
result = await cli_module._discover_running_services(mock_config)
result = await operations_module.discover_running_services(mock_config)
assert result == {"plex": "nas02"}

View File

@@ -4,6 +4,7 @@ from pathlib import Path
import yaml
from compose_farm.compose import parse_external_networks
from compose_farm.config import Config, Host
from compose_farm.traefik import generate_traefik_config
@@ -241,3 +242,97 @@ def test_generate_follows_network_mode_service_for_ports(tmp_path: Path) -> None
assert torrent_servers == [{"url": "http://192.168.1.10:5080"}]
prowlarr_servers = dynamic["http"]["services"]["prowlarr"]["loadbalancer"]["servers"]
assert prowlarr_servers == [{"url": "http://192.168.1.10:9696"}]
def test_parse_external_networks_single(tmp_path: Path) -> None:
"""Extract a single external network from compose file."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
compose_path = tmp_path / "app" / "compose.yaml"
_write_compose(
compose_path,
{
"services": {"app": {"image": "nginx"}},
"networks": {"mynetwork": {"external": True}},
},
)
networks = parse_external_networks(cfg, "app")
assert networks == ["mynetwork"]
def test_parse_external_networks_multiple(tmp_path: Path) -> None:
"""Extract multiple external networks from compose file."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
compose_path = tmp_path / "app" / "compose.yaml"
_write_compose(
compose_path,
{
"services": {"app": {"image": "nginx"}},
"networks": {
"frontend": {"external": True},
"backend": {"external": True},
"internal": {"driver": "bridge"}, # not external
},
},
)
networks = parse_external_networks(cfg, "app")
assert set(networks) == {"frontend", "backend"}
def test_parse_external_networks_none(tmp_path: Path) -> None:
"""No external networks returns empty list."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
compose_path = tmp_path / "app" / "compose.yaml"
_write_compose(
compose_path,
{
"services": {"app": {"image": "nginx"}},
"networks": {"internal": {"driver": "bridge"}},
},
)
networks = parse_external_networks(cfg, "app")
assert networks == []
def test_parse_external_networks_no_networks_section(tmp_path: Path) -> None:
"""No networks section returns empty list."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
compose_path = tmp_path / "app" / "compose.yaml"
_write_compose(
compose_path,
{"services": {"app": {"image": "nginx"}}},
)
networks = parse_external_networks(cfg, "app")
assert networks == []
def test_parse_external_networks_missing_compose(tmp_path: Path) -> None:
"""Missing compose file returns empty list."""
cfg = Config(
compose_dir=tmp_path,
hosts={"host1": Host(address="192.168.1.10")},
services={"app": "host1"},
)
# Don't create compose file
networks = parse_external_networks(cfg, "app")
assert networks == []