Compare commits

...

4 Commits

Author SHA1 Message Date
Bas Nijholt
7ccb0734a2 refactor(web): consolidate JS patterns and use icon macros (#58) 2025-12-19 14:55:31 -08:00
Bas Nijholt
61a845fad8 test: add comprehensive browser tests for HTMX/JS functionality (#59) 2025-12-19 14:27:00 -08:00
Bas Nijholt
e7efae0153 refactor: remove dead code and reduce duplication (#57)
- Delete unused add_service_to_host/remove_service_from_host from state.py
  (42 lines of dead code never called anywhere)

- Extract _stream_output_lines helper in executor.py to deduplicate
  identical read_stream functions in _run_local_command and _run_ssh_command

- Simplify unique-list logic in compose.py using dict.fromkeys()
  instead of manual seen/unique set/list pattern

Total: -67 lines
2025-12-18 23:56:49 -08:00
Bas Nijholt
b4ebe15dd1 refactor: simplify codebase with reduced abstractions (#56)
- Remove dead code: `run_host_operation` in cli/common.py (never called)
- Inline `_report_*` helpers in lifecycle.py (each called once)
- Merge `validate_host` into `validate_hosts` with flexible str|list param
- Merge `_report_no_config_found` and `_report_config_path_not_exists`
  into single `_report_missing_config` function
- Simplify `_get_editor` from 18 lines to 6 using walrus operator
- Extract `COMPOSE_FILENAMES` constant to avoid duplication in config.py
- Extract `_stream_subprocess` helper to reduce duplication in streaming.py

Net reduction: ~130 lines of code with no functionality changes.
2025-12-18 23:45:34 -08:00
21 changed files with 1083 additions and 472 deletions

View File

@@ -53,6 +53,23 @@ Icons use [Lucide](https://lucide.dev/). Add new icons as macros in `web/templat
- **Imports at top level**: Never add imports inside functions unless they are explicitly marked with `# noqa: PLC0415` and a comment explaining it speeds up CLI startup. Heavy modules like `pydantic`, `yaml`, and `rich.table` are lazily imported to keep `cf --help` fast.
## Testing
Run tests with `uv run pytest`. Browser tests require Chromium (system-installed or via `playwright install chromium`):
```bash
# Unit tests only (skip browser tests, can parallelize)
uv run pytest -m "not browser" -n auto
# Browser tests only (run sequentially, no coverage)
uv run pytest -m browser --no-cov
# All tests
uv run pytest --no-cov
```
Browser tests are marked with `@pytest.mark.browser`. They use Playwright to test HTMX behavior, JavaScript functionality (sidebar filter, command palette, terminals), and content stability during navigation. Run sequentially (no `-n`) to avoid resource contention.
## Communication Notes
- Clarify ambiguous wording (e.g., homophones like "right"/"write", "their"/"there").

View File

@@ -617,12 +617,14 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
This is the "reconcile" command that ensures running services match your
config file. It will:
1. Stop orphaned services (in state but removed from config) 2. Migrate
services on wrong host (host in state ≠ host in config) 3. Start missing
services (in config but not in state)
Use --dry-run to preview changes before applying. Use --no-orphans to only
migrate/start without stopping orphaned services. Use --full to also run 'up'
on all services (picks up compose/env changes).
1. Stop orphaned services (in state but removed from config)
2. Migrate services on wrong host (host in state ≠ host in config)
3. Start missing services (in config but not in state)
Use --dry-run to preview changes before applying.
Use --no-orphans to only migrate/start without stopping orphaned services.
Use --full to also run 'up' on all services (picks up compose/env changes).
╭─ Options ────────────────────────────────────────────────────────────────────╮
│ --dry-run -n Show what would change without executing │
@@ -696,9 +698,10 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
Update local state from running services.
Discovers which services are running on which hosts, updates the state file,
and captures image digests. This is a read operation - it updates your local
state to match reality, not the other way around.
Discovers which services are running on which hosts, updates the state
file, and captures image digests. This is a read operation - it updates
your local state to match reality, not the other way around.
Use 'cf apply' to make reality match your config (stop orphans, migrate).
╭─ Options ────────────────────────────────────────────────────────────────────╮
@@ -734,8 +737,10 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
Validate configuration, traefik labels, mounts, and networks.
Without arguments: validates all services against configured hosts. With
service arguments: validates specific services and shows host compatibility.
Without arguments: validates all services against configured hosts.
With service arguments: validates specific services and shows host
compatibility.
Use --local to skip SSH-based checks for faster validation.
╭─ Arguments ──────────────────────────────────────────────────────────────────╮
@@ -774,8 +779,8 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
Create Docker network on hosts with consistent settings.
Creates an external Docker network that services can use for cross-host
communication. Uses the same subnet/gateway on all hosts to ensure consistent
networking.
communication. Uses the same subnet/gateway on all hosts to ensure
consistent networking.
╭─ Arguments ──────────────────────────────────────────────────────────────────╮
│ hosts [HOSTS]... Hosts to create network on (default: all) │
@@ -908,8 +913,9 @@ Full `--help` output for each command. See the [Usage](#usage) table above for a
Show status of services.
Without arguments: shows all services (same as --all). With service names:
shows only those services. With --host: shows services on that host.
Without arguments: shows all services (same as --all).
With service names: shows only those services.
With --host: shows services on that host.
╭─ Arguments ──────────────────────────────────────────────────────────────────╮
│ services [SERVICES]... Services to operate on │

View File

@@ -145,6 +145,9 @@ addopts = [
"--no-cov-on-fail",
"-v",
]
markers = [
"browser: marks tests as browser tests (deselect with '-m \"not browser\"')",
]
[tool.coverage.run]
omit = []
@@ -176,4 +179,6 @@ dev = [
"httpx>=0.28.0",
# For browser tests (use system chromium via nix-shell -p chromium)
"pytest-playwright>=0.7.0",
# For parallel test execution
"pytest-xdist>=3.0.0",
]

View File

@@ -1,16 +0,0 @@
# Development shell with Chromium for browser tests
# Usage: nix-shell --run "uv run pytest tests/web/test_htmx_browser.py -v --no-cov"
{ pkgs ? import <nixpkgs> {} }:
pkgs.mkShell {
buildInputs = [
pkgs.chromium
];
shellHook = ''
echo "Chromium available at: $(which chromium)"
echo ""
echo "Run browser tests with:"
echo " uv run pytest tests/web/test_htmx_browser.py -v --no-cov"
'';
}

View File

@@ -169,7 +169,7 @@ def get_services(
config = load_config_or_exit(config_path)
if host is not None:
validate_host(config, host)
validate_hosts(config, host)
svc_list = [s for s in config.services if host in config.get_hosts(s)]
if not svc_list:
print_warning(f"No services configured for host [magenta]{host}[/]")
@@ -286,16 +286,10 @@ def validate_services(cfg: Config, services: list[str], *, hint: str | None = No
raise typer.Exit(1)
def validate_host(cfg: Config, host: str) -> None:
"""Validate that a host exists in config. Exits with error if not found."""
if host not in cfg.hosts:
print_error(MSG_HOST_NOT_FOUND.format(name=host))
raise typer.Exit(1)
def validate_hosts(cfg: Config, hosts: list[str]) -> None:
"""Validate that all hosts exist in config. Exits with error if any not found."""
invalid = [h for h in hosts if h not in cfg.hosts]
def validate_hosts(cfg: Config, hosts: str | list[str]) -> None:
"""Validate that host(s) exist in config. Exits with error if any not found."""
host_list = [hosts] if isinstance(hosts, str) else hosts
invalid = [h for h in host_list if h not in cfg.hosts]
if invalid:
for h in invalid:
print_error(MSG_HOST_NOT_FOUND.format(name=h))
@@ -304,7 +298,7 @@ def validate_hosts(cfg: Config, hosts: list[str]) -> None:
def validate_host_for_service(cfg: Config, service: str, host: str) -> None:
"""Validate that a host is valid for a service."""
validate_host(cfg, host)
validate_hosts(cfg, host)
allowed_hosts = cfg.get_hosts(service)
if host not in allowed_hosts:
print_error(
@@ -328,27 +322,3 @@ def validate_service_selection(
if methods > 1:
print_error("Use only one of: service names, [bold]--all[/], or [bold]--host[/]")
raise typer.Exit(1)
def run_host_operation(
cfg: Config,
svc_list: list[str],
host: str,
command: str,
action_verb: str,
state_callback: Callable[[Config, str, str], None],
) -> None:
"""Run an operation on a specific host for multiple services."""
from compose_farm.executor import run_compose_on_host # noqa: PLC0415
results: list[CommandResult] = []
for service in svc_list:
validate_host_for_service(cfg, service, host)
console.print(f"[cyan]\\[{service}][/] {action_verb} on [magenta]{host}[/]...")
result = run_async(run_compose_on_host(cfg, service, host, command, raw=True))
print() # Newline after raw output
results.append(result)
if result.success:
state_callback(cfg, service, host)
maybe_regenerate_traefik(cfg, results)
report_results(results)

View File

@@ -40,24 +40,12 @@ _RawOption = Annotated[
def _get_editor() -> str:
"""Get the user's preferred editor.
Checks $EDITOR, then $VISUAL, then falls back to platform defaults.
"""
for env_var in ("EDITOR", "VISUAL"):
editor = os.environ.get(env_var)
if editor:
return editor
"""Get the user's preferred editor ($EDITOR > $VISUAL > platform default)."""
if editor := os.environ.get("EDITOR") or os.environ.get("VISUAL"):
return editor
if platform.system() == "Windows":
return "notepad"
# Try common editors on Unix-like systems
for editor in ("nano", "vim", "vi"):
if shutil.which(editor):
return editor
return "vi"
return next((e for e in ("nano", "vim", "vi") if shutil.which(e)), "vi")
def _generate_template() -> str:
@@ -80,20 +68,16 @@ def _get_config_file(path: Path | None) -> Path | None:
return config_path.resolve() if config_path else None
def _report_no_config_found() -> None:
"""Report that no config file was found in search paths."""
console.print("[yellow]No config file found.[/yellow]")
console.print("\nSearched locations:")
for p in config_search_paths():
status = "[green]exists[/green]" if p.exists() else "[dim]not found[/dim]"
console.print(f" - {p} ({status})")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
def _report_config_path_not_exists(config_file: Path) -> None:
"""Report that an explicit config path doesn't exist."""
def _report_missing_config(explicit_path: Path | None = None) -> None:
"""Report that a config file was not found."""
console.print("[yellow]Config file not found.[/yellow]")
console.print(f"\nProvided path does not exist: [cyan]{config_file}[/cyan]")
if explicit_path:
console.print(f"\nProvided path does not exist: [cyan]{explicit_path}[/cyan]")
else:
console.print("\nSearched locations:")
for p in config_search_paths():
status = "[green]exists[/green]" if p.exists() else "[dim]not found[/dim]"
console.print(f" - {p} ({status})")
console.print("\nRun [bold cyan]cf config init[/bold cyan] to create one.")
@@ -140,11 +124,11 @@ def config_edit(
config_file = _get_config_file(path)
if config_file is None:
_report_no_config_found()
_report_missing_config()
raise typer.Exit(1)
if not config_file.exists():
_report_config_path_not_exists(config_file)
_report_missing_config(config_file)
raise typer.Exit(1)
editor = _get_editor()
@@ -180,11 +164,11 @@ def config_show(
config_file = _get_config_file(path)
if config_file is None:
_report_no_config_found()
_report_missing_config()
raise typer.Exit(0)
if not config_file.exists():
_report_config_path_not_exists(config_file)
_report_missing_config(config_file)
raise typer.Exit(1)
content = config_file.read_text(encoding="utf-8")
@@ -211,7 +195,7 @@ def config_path(
config_file = _get_config_file(path)
if config_file is None:
_report_no_config_found()
_report_missing_config()
raise typer.Exit(1)
# Just print the path for easy piping

View File

@@ -2,13 +2,10 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Annotated
from typing import Annotated
import typer
if TYPE_CHECKING:
from compose_farm.config import Config
from compose_farm.cli.app import app
from compose_farm.cli.common import (
AllOption,
@@ -148,40 +145,8 @@ def update(
report_results(results)
def _report_pending_migrations(cfg: Config, migrations: list[str]) -> None:
"""Report services that need migration."""
console.print(f"[cyan]Services to migrate ({len(migrations)}):[/]")
for svc in migrations:
current = get_service_host(cfg, svc)
target = cfg.get_hosts(svc)[0]
console.print(f" [cyan]{svc}[/]: [magenta]{current}[/] → [magenta]{target}[/]")
def _report_pending_orphans(orphaned: dict[str, str | list[str]]) -> None:
"""Report orphaned services that will be stopped."""
console.print(f"[yellow]Orphaned services to stop ({len(orphaned)}):[/]")
for svc, hosts in orphaned.items():
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(hosts)}[/]")
def _report_pending_starts(cfg: Config, missing: list[str]) -> None:
"""Report services that will be started."""
console.print(f"[green]Services to start ({len(missing)}):[/]")
for svc in missing:
target = format_host(cfg.get_hosts(svc))
console.print(f" [cyan]{svc}[/] on [magenta]{target}[/]")
def _report_pending_refresh(cfg: Config, to_refresh: list[str]) -> None:
"""Report services that will be refreshed."""
console.print(f"[blue]Services to refresh ({len(to_refresh)}):[/]")
for svc in to_refresh:
target = format_host(cfg.get_hosts(svc))
console.print(f" [cyan]{svc}[/] on [magenta]{target}[/]")
@app.command(rich_help_panel="Lifecycle")
def apply(
def apply( # noqa: PLR0912 (multi-phase reconciliation needs these branches)
dry_run: Annotated[
bool,
typer.Option("--dry-run", "-n", help="Show what would change without executing"),
@@ -229,13 +194,23 @@ def apply(
# Report what will be done
if has_orphans:
_report_pending_orphans(orphaned)
console.print(f"[yellow]Orphaned services to stop ({len(orphaned)}):[/]")
for svc, hosts in orphaned.items():
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(hosts)}[/]")
if has_migrations:
_report_pending_migrations(cfg, migrations)
console.print(f"[cyan]Services to migrate ({len(migrations)}):[/]")
for svc in migrations:
current = get_service_host(cfg, svc)
target = cfg.get_hosts(svc)[0]
console.print(f" [cyan]{svc}[/]: [magenta]{current}[/] → [magenta]{target}[/]")
if has_missing:
_report_pending_starts(cfg, missing)
console.print(f"[green]Services to start ({len(missing)}):[/]")
for svc in missing:
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(cfg.get_hosts(svc))}[/]")
if has_refresh:
_report_pending_refresh(cfg, to_refresh)
console.print(f"[blue]Services to refresh ({len(to_refresh)}):[/]")
for svc in to_refresh:
console.print(f" [cyan]{svc}[/] on [magenta]{format_host(cfg.get_hosts(svc))}[/]")
if dry_run:
console.print(f"\n{MSG_DRY_RUN}")

View File

@@ -213,13 +213,7 @@ def parse_host_volumes(config: Config, service: str) -> list[str]:
paths.append(host_path)
# Return unique paths, preserving order
seen: set[str] = set()
unique: list[str] = []
for p in paths:
if p not in seen:
seen.add(p)
unique.append(p)
return unique
return list(dict.fromkeys(paths))
def parse_devices(config: Config, service: str) -> list[str]:
@@ -258,13 +252,7 @@ def parse_devices(config: Config, service: str) -> list[str]:
devices.append(host_path)
# Return unique devices, preserving order
seen: set[str] = set()
unique: list[str] = []
for d in devices:
if d not in seen:
seen.add(d)
unique.append(d)
return unique
return list(dict.fromkeys(devices))
def parse_external_networks(config: Config, service: str) -> list[str]:

View File

@@ -10,6 +10,9 @@ from pydantic import BaseModel, Field, model_validator
from .paths import config_search_paths, find_config_path
# Supported compose filenames, in priority order
COMPOSE_FILENAMES = ("compose.yaml", "compose.yml", "docker-compose.yml", "docker-compose.yaml")
class Host(BaseModel):
"""SSH host configuration."""
@@ -90,17 +93,9 @@ class Config(BaseModel):
return self.hosts[host_names[0]]
def get_compose_path(self, service: str) -> Path:
"""Get compose file path for a service.
Tries compose.yaml first, then docker-compose.yml.
"""
"""Get compose file path for a service (tries compose.yaml first)."""
service_dir = self.compose_dir / service
for filename in (
"compose.yaml",
"compose.yml",
"docker-compose.yml",
"docker-compose.yaml",
):
for filename in COMPOSE_FILENAMES:
candidate = service_dir / filename
if candidate.exists():
return candidate
@@ -109,21 +104,12 @@ class Config(BaseModel):
def discover_compose_dirs(self) -> set[str]:
"""Find all directories in compose_dir that contain a compose file."""
compose_filenames = {
"compose.yaml",
"compose.yml",
"docker-compose.yml",
"docker-compose.yaml",
}
found: set[str] = set()
if not self.compose_dir.exists():
return found
for subdir in self.compose_dir.iterdir():
if subdir.is_dir():
for filename in compose_filenames:
if (subdir / filename).exists():
found.add(subdir.name)
break
if subdir.is_dir() and any((subdir / f).exists() for f in COMPOSE_FILENAMES):
found.add(subdir.name)
return found

View File

@@ -23,6 +23,23 @@ LOCAL_ADDRESSES = frozenset({"local", "localhost", "127.0.0.1", "::1"})
_DEFAULT_SSH_PORT = 22
async def _stream_output_lines(
reader: Any,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
"""Stream lines from a reader to console with a service prefix.
Works with both asyncio.StreamReader (bytes) and asyncssh readers (str).
"""
out = err_console if is_stderr else console
async for line in reader:
text = line.decode() if isinstance(line, bytes) else line
if text.strip():
out.print(f"[cyan]\\[{prefix}][/] {escape(text)}", end="")
def build_ssh_command(host: Host, command: str, *, tty: bool = False) -> list[str]:
"""Build SSH command args for executing a command on a remote host.
@@ -158,25 +175,9 @@ async def _run_local_command(
)
if stream and proc.stdout and proc.stderr:
async def read_stream(
reader: asyncio.StreamReader,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
out = err_console if is_stderr else console
while True:
line = await reader.readline()
if not line:
break
text = line.decode()
if text.strip(): # Skip empty lines
out.print(f"[cyan]\\[{prefix}][/] {escape(text)}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
read_stream(proc.stderr, service, is_stderr=True),
_stream_output_lines(proc.stdout, service),
_stream_output_lines(proc.stderr, service, is_stderr=True),
)
stdout_data = b""
@@ -226,21 +227,9 @@ async def _run_ssh_command(
async with asyncssh.connect(**ssh_connect_kwargs(host)) as conn: # noqa: SIM117
async with conn.create_process(command) as proc:
if stream:
async def read_stream(
reader: Any,
prefix: str,
*,
is_stderr: bool = False,
) -> None:
out = err_console if is_stderr else console
async for line in reader:
if line.strip(): # Skip empty lines
out.print(f"[cyan]\\[{prefix}][/] {escape(line)}", end="")
await asyncio.gather(
read_stream(proc.stdout, service),
read_stream(proc.stderr, service, is_stderr=True),
_stream_output_lines(proc.stdout, service),
_stream_output_lines(proc.stderr, service, is_stderr=True),
)
stdout_data = ""

View File

@@ -115,50 +115,6 @@ def remove_service(config: Config, service: str) -> None:
state.pop(service, None)
def add_service_to_host(config: Config, service: str, host: str) -> None:
"""Add a specific host to a service's state.
For multi-host services, adds the host to the list if not present.
For single-host services, sets the host.
"""
with _modify_state(config) as state:
current = state.get(service)
if config.is_multi_host(service):
# Multi-host: add to list if not present
if isinstance(current, list):
if host not in current:
state[service] = [*current, host]
else:
state[service] = [host]
else:
# Single-host: just set it
state[service] = host
def remove_service_from_host(config: Config, service: str, host: str) -> None:
"""Remove a specific host from a service's state.
For multi-host services, removes just that host from the list.
For single-host services, removes the service entirely if host matches.
"""
with _modify_state(config) as state:
current = state.get(service)
if current is None:
return
if isinstance(current, list):
# Multi-host: remove this host from list
remaining = [h for h in current if h != host]
if remaining:
state[service] = remaining
else:
state.pop(service, None)
elif current == host:
# Single-host: remove if matches
state.pop(service, None)
def get_services_needing_migration(config: Config) -> list[str]:
"""Get services where current host differs from configured host.

View File

@@ -32,10 +32,17 @@ def _start_task(coro_factory: Callable[[str], Coroutine[Any, Any, None]]) -> str
return task_id
async def _run_service_action(name: str, command: str) -> dict[str, Any]:
"""Run a compose command for a service."""
config = get_config()
# Allowed service commands
ALLOWED_COMMANDS = {"up", "down", "restart", "pull", "update", "logs"}
@router.post("/service/{name}/{command}")
async def service_action(name: str, command: str) -> dict[str, Any]:
"""Run a compose command for a service (up, down, restart, pull, update, logs)."""
if command not in ALLOWED_COMMANDS:
raise HTTPException(status_code=404, detail=f"Unknown command '{command}'")
config = get_config()
if name not in config.services:
raise HTTPException(status_code=404, detail=f"Service '{name}' not found")
@@ -43,42 +50,6 @@ async def _run_service_action(name: str, command: str) -> dict[str, Any]:
return {"task_id": task_id, "service": name, "command": command}
@router.post("/service/{name}/up")
async def up_service(name: str) -> dict[str, Any]:
"""Start a service."""
return await _run_service_action(name, "up")
@router.post("/service/{name}/down")
async def down_service(name: str) -> dict[str, Any]:
"""Stop a service."""
return await _run_service_action(name, "down")
@router.post("/service/{name}/restart")
async def restart_service(name: str) -> dict[str, Any]:
"""Restart a service (down + up)."""
return await _run_service_action(name, "restart")
@router.post("/service/{name}/pull")
async def pull_service(name: str) -> dict[str, Any]:
"""Pull latest images for a service."""
return await _run_service_action(name, "pull")
@router.post("/service/{name}/update")
async def update_service(name: str) -> dict[str, Any]:
"""Update a service (pull + build + down + up)."""
return await _run_service_action(name, "update")
@router.post("/service/{name}/logs")
async def logs_service(name: str) -> dict[str, Any]:
"""Show logs for a service."""
return await _run_service_action(name, "logs")
@router.post("/apply")
async def apply_all() -> dict[str, Any]:
"""Run cf apply to reconcile all services."""

View File

@@ -302,9 +302,9 @@ async def _read_file_remote(host: Any, path: str) -> str:
async def _write_file_remote(host: Any, path: str, content: str) -> None:
"""Write content to a file on a remote host via SSH."""
# Expand ~ on remote by using shell
target_path = f"~/{path[2:]}" if path.startswith("~/") else path
cmd = f"cat > {shlex.quote(target_path)}"
# Expand ~ on remote: keep ~ unquoted for shell expansion, quote the rest
target = f"~/{shlex.quote(path[2:])}" if path.startswith("~/") else shlex.quote(path)
cmd = f"cat > {target}"
async with asyncssh.connect(**ssh_connect_kwargs(host)) as conn:
result = await conn.run(cmd, input=content, check=True)

View File

@@ -79,7 +79,7 @@ const TERMINAL_THEME = {
* @param {HTMLElement} container - Container element
* @param {object} extraOptions - Additional terminal options
* @param {function} onResize - Optional callback called with (cols, rows) after resize
* @returns {{term: Terminal, fitAddon: FitAddon}}
* @returns {{term: Terminal, fitAddon: FitAddon, dispose: function}}
*/
function createTerminal(container, extraOptions = {}, onResize = null) {
container.innerHTML = '';
@@ -96,17 +96,26 @@ function createTerminal(container, extraOptions = {}, onResize = null) {
const fitAddon = new FitAddon.FitAddon();
term.loadAddon(fitAddon);
term.open(container);
fitAddon.fit();
const handleResize = () => {
fitAddon.fit();
onResize?.(term.cols, term.rows);
};
window.addEventListener('resize', handleResize);
new ResizeObserver(handleResize).observe(container);
// Use ResizeObserver only (handles both container and window resize)
const resizeObserver = new ResizeObserver(handleResize);
resizeObserver.observe(container);
return { term, fitAddon };
handleResize(); // Initial fit
return {
term,
fitAddon,
dispose() {
resizeObserver.disconnect();
term.dispose();
}
};
}
/**
@@ -120,6 +129,25 @@ function createWebSocket(path) {
}
window.createWebSocket = createWebSocket;
/**
* Wait for xterm.js to load, then execute callback
* @param {function} callback - Function to call when xterm is ready
* @param {number} maxAttempts - Max attempts (default 20 = 2 seconds)
*/
function whenXtermReady(callback, maxAttempts = 20) {
const tryInit = (attempts) => {
if (typeof Terminal !== 'undefined' && typeof FitAddon !== 'undefined') {
callback();
} else if (attempts > 0) {
setTimeout(() => tryInit(attempts - 1), 100);
} else {
console.error('xterm.js failed to load');
}
};
tryInit(maxAttempts);
}
window.whenXtermReady = whenXtermReady;
/**
* Initialize a terminal and connect to WebSocket for streaming
*/
@@ -130,7 +158,8 @@ function initTerminal(elementId, taskId) {
return;
}
const { term, fitAddon } = createTerminal(container);
const wrapper = createTerminal(container);
const { term } = wrapper;
const ws = createWebSocket(`/ws/terminal/${taskId}`);
const taskKey = getTaskKey();
@@ -152,7 +181,7 @@ function initTerminal(elementId, taskId) {
setTerminalLoading(false);
};
terminals[taskId] = { term, ws, fitAddon };
terminals[taskId] = { ...wrapper, ws };
return { term, ws };
}
@@ -161,7 +190,7 @@ window.initTerminal = initTerminal;
/**
* Initialize an interactive exec terminal
*/
let execTerminal = null;
let execTerminalWrapper = null; // {term, dispose}
let execWs = null;
function initExecTerminal(service, container, host) {
@@ -175,9 +204,9 @@ function initExecTerminal(service, container, host) {
containerEl.classList.remove('hidden');
// Clean up existing
// Clean up existing (use wrapper's dispose to clean up ResizeObserver)
if (execWs) { execWs.close(); execWs = null; }
if (execTerminal) { execTerminal.dispose(); execTerminal = null; }
if (execTerminalWrapper) { execTerminalWrapper.dispose(); execTerminalWrapper = null; }
// Create WebSocket first so resize callback can use it
execWs = createWebSocket(`/ws/exec/${service}/${container}/${host}`);
@@ -189,8 +218,8 @@ function initExecTerminal(service, container, host) {
}
};
const { term } = createTerminal(terminalEl, { cursorBlink: true }, sendSize);
execTerminal = term;
execTerminalWrapper = createTerminal(terminalEl, { cursorBlink: true }, sendSize);
const term = execTerminalWrapper.term;
execWs.onopen = () => { sendSize(term.cols, term.rows); term.focus(); };
execWs.onmessage = (event) => term.write(event.data);
@@ -217,6 +246,22 @@ function refreshDashboard() {
document.body.dispatchEvent(new CustomEvent('cf:refresh'));
}
/**
* Filter sidebar services by name and host
*/
function sidebarFilter() {
const q = (document.getElementById('sidebar-filter')?.value || '').toLowerCase();
const h = document.getElementById('sidebar-host-select')?.value || '';
let n = 0;
document.querySelectorAll('#sidebar-services li').forEach(li => {
const show = (!q || li.dataset.svc.includes(q)) && (!h || !li.dataset.h || li.dataset.h === h);
li.hidden = !show;
if (show) n++;
});
document.getElementById('sidebar-count').textContent = '(' + n + ')';
}
window.sidebarFilter = sidebarFilter;
/**
* Load Monaco editor dynamically (only once)
*/
@@ -413,16 +458,10 @@ function tryReconnectToTask() {
const taskId = localStorage.getItem(getTaskKey());
if (!taskId) return;
// Wait for xterm to be loaded
const tryInit = (attempts) => {
if (typeof Terminal !== 'undefined' && typeof FitAddon !== 'undefined') {
expandTerminal();
initTerminal('terminal-output', taskId);
} else if (attempts > 0) {
setTimeout(() => tryInit(attempts - 1), 100);
}
};
tryInit(20);
whenXtermReady(() => {
expandTerminal();
initTerminal('terminal-output', taskId);
});
}
// Play intro animation on command palette button
@@ -494,20 +533,8 @@ document.body.addEventListener('htmx:afterRequest', function(evt) {
try {
const response = JSON.parse(text);
if (response.task_id) {
// Expand terminal and scroll to it
expandTerminal();
// Wait for xterm to be loaded if needed
const tryInit = (attempts) => {
if (typeof Terminal !== 'undefined' && typeof FitAddon !== 'undefined') {
initTerminal('terminal-output', response.task_id);
} else if (attempts > 0) {
setTimeout(() => tryInit(attempts - 1), 100);
} else {
console.error('xterm.js failed to load');
}
};
tryInit(20); // Try for up to 2 seconds
whenXtermReady(() => initTerminal('terminal-output', response.task_id));
}
} catch (e) {
// Not valid JSON, ignore
@@ -542,17 +569,13 @@ document.body.addEventListener('htmx:afterRequest', function(evt) {
history.pushState({}, '', url);
});
};
// Navigate to dashboard and trigger action (or just POST if already on dashboard)
const dashboardAction = (endpoint) => () => {
if (window.location.pathname === '/') {
htmx.ajax('POST', `/api/${endpoint}`, {swap: 'none'});
} else {
// Navigate via HTMX, then trigger action after swap
htmx.ajax('GET', '/', {target: '#main-content', select: '#main-content', swap: 'outerHTML'}).then(() => {
history.pushState({}, '', '/');
htmx.ajax('POST', `/api/${endpoint}`, {swap: 'none'});
});
// Navigate to dashboard (if needed) and trigger action
const dashboardAction = (endpoint) => async () => {
if (window.location.pathname !== '/') {
await htmx.ajax('GET', '/', {target: '#main-content', select: '#main-content', swap: 'outerHTML'});
history.pushState({}, '', '/');
}
htmx.ajax('POST', `/api/${endpoint}`, {swap: 'none'});
};
const cmd = (type, name, desc, action, icon = null) => ({ type, name, desc, action, icon });

View File

@@ -52,50 +52,40 @@ async def stream_to_task(task_id: str, message: str) -> None:
tasks[task_id]["output"].append(message)
async def _stream_subprocess(task_id: str, args: list[str], env: dict[str, str]) -> int:
"""Run subprocess and stream output to task buffer. Returns exit code."""
process = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
)
if process.stdout:
async for line in process.stdout:
text = line.decode("utf-8", errors="replace")
# Convert \n to \r\n for xterm.js
if text.endswith("\n") and not text.endswith("\r\n"):
text = text[:-1] + "\r\n"
await stream_to_task(task_id, text)
return await process.wait()
async def run_cli_streaming(
config: Config,
args: list[str],
task_id: str,
) -> None:
"""Run a cf CLI command as subprocess and stream output to task buffer.
This reuses all CLI logic including Rich formatting, progress bars, etc.
The subprocess gets a pseudo-TTY via FORCE_COLOR so Rich outputs ANSI codes.
"""
"""Run a cf CLI command as subprocess and stream output to task buffer."""
try:
# Build command - config option goes after the subcommand
cmd = ["cf", *args, f"--config={config.config_path}"]
await stream_to_task(task_id, f"{DIM}$ {' '.join(['cf', *args])}{RESET}{CRLF}")
# Show command being executed
cmd_display = " ".join(["cf", *args])
await stream_to_task(task_id, f"{DIM}$ {cmd_display}{RESET}{CRLF}")
# Force color output even though there's no real TTY
# Set COLUMNS for Rich/Typer to format output correctly
env = {"FORCE_COLOR": "1", "TERM": "xterm-256color", "COLUMNS": "120"}
# Ensure SSH agent is available (auto-detect if needed)
ssh_sock = get_ssh_auth_sock()
if ssh_sock:
# Build environment with color support and SSH agent
env = {**os.environ, "FORCE_COLOR": "1", "TERM": "xterm-256color", "COLUMNS": "120"}
if ssh_sock := get_ssh_auth_sock():
env["SSH_AUTH_SOCK"] = ssh_sock
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env={**os.environ, **env},
)
# Stream output line by line
if process.stdout:
async for line in process.stdout:
text = line.decode("utf-8", errors="replace")
# Convert \n to \r\n for xterm.js
if text.endswith("\n") and not text.endswith("\r\n"):
text = text[:-1] + "\r\n"
await stream_to_task(task_id, text)
exit_code = await process.wait()
exit_code = await _stream_subprocess(task_id, cmd, env)
tasks[task_id]["status"] = "completed" if exit_code == 0 else "failed"
tasks[task_id]["completed_at"] = time.time()
@@ -122,71 +112,32 @@ async def _run_cli_via_ssh(
args: list[str],
task_id: str,
) -> None:
"""Run a cf CLI command via SSH to the host.
Used for self-updates to ensure the command survives container restart.
Uses setsid to run command in a new session (completely detached), with
output going to a log file. We tail the log to stream output. When SSH
dies (container killed), the tail dies but the setsid process continues.
"""
"""Run a cf CLI command via SSH for self-updates (survives container restart)."""
try:
# Get the host for the web service
host = config.get_host(CF_WEB_SERVICE)
cf_cmd = f"cf {' '.join(args)} --config={config.config_path}"
log_file = "/tmp/cf-self-update.log" # noqa: S108
# Include task_id to prevent collision with concurrent updates
log_file = f"/tmp/cf-self-update-{task_id}.log" # noqa: S108
# Build the remote command:
# 1. setsid runs command in new session (survives SSH disconnect)
# 2. Output goes to log file
# 3. tail -f streams the log (dies when SSH dies, but command continues)
# 4. wait for tail or timeout after command should be done
# setsid detaches command; tail streams output until SSH dies
remote_cmd = (
f"rm -f {log_file} && "
f"PATH=$HOME/.local/bin:/usr/local/bin:$PATH "
f"setsid sh -c '{cf_cmd} > {log_file} 2>&1' & "
f"sleep 0.3 && "
f"tail -f {log_file} 2>/dev/null"
f"sleep 0.3 && tail -f {log_file} 2>/dev/null"
)
# Show what we're doing
await stream_to_task(
task_id,
f"{DIM}$ {cf_cmd}{RESET}{CRLF}",
)
await stream_to_task(
task_id,
f"{GREEN}Running via SSH (detached with setsid){RESET}{CRLF}",
)
await stream_to_task(task_id, f"{DIM}$ {cf_cmd}{RESET}{CRLF}")
await stream_to_task(task_id, f"{GREEN}Running via SSH (detached with setsid){RESET}{CRLF}")
# Build SSH command (no TTY needed, output comes from tail)
ssh_args = build_ssh_command(host, remote_cmd, tty=False)
# Set up environment with SSH agent
env = {**os.environ}
ssh_sock = get_ssh_auth_sock()
if ssh_sock:
if ssh_sock := get_ssh_auth_sock():
env["SSH_AUTH_SOCK"] = ssh_sock
process = await asyncio.create_subprocess_exec(
*ssh_args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env=env,
)
exit_code = await _stream_subprocess(task_id, ssh_args, env)
# Stream output until SSH dies (container killed) or command completes
if process.stdout:
async for line in process.stdout:
text = line.decode("utf-8", errors="replace")
if text.endswith("\n") and not text.endswith("\r\n"):
text = text[:-1] + "\r\n"
await stream_to_task(task_id, text)
exit_code = await process.wait()
# Exit code 255 means SSH connection closed (container died during down)
# This is expected for self-updates - setsid ensures command continues
# Exit code 255 = SSH closed (container died during down) - expected for self-updates
if exit_code == 255: # noqa: PLR2004
await stream_to_task(
task_id,

View File

@@ -42,7 +42,7 @@
<script>
// Use var to allow re-declaration on HTMX navigation
var consoleTerminal = null;
var consoleTerminalWrapper = null; // {term, dispose}
var consoleWs = null;
var consoleEditor = null;
var currentFilePath = null;
@@ -68,14 +68,14 @@ function connectConsole() {
currentHost = host;
// Clean up existing connection
// Clean up existing connection (use wrapper's dispose to clean up ResizeObserver)
if (consoleWs) {
consoleWs.close();
consoleWs = null;
}
if (consoleTerminal) {
consoleTerminal.dispose();
consoleTerminal = null;
if (consoleTerminalWrapper) {
consoleTerminalWrapper.dispose();
consoleTerminalWrapper = null;
}
statusEl.textContent = 'Connecting...';
@@ -91,8 +91,8 @@ function connectConsole() {
};
// Create terminal with resize callback
const { term } = createTerminal(terminalEl, { cursorBlink: true }, sendSize);
consoleTerminal = term;
consoleTerminalWrapper = createTerminal(terminalEl, { cursorBlink: true }, sendSize);
const term = consoleTerminalWrapper.term;
consoleWs.onopen = () => {
statusEl.textContent = `Connected to ${host}`;
@@ -101,13 +101,7 @@ function connectConsole() {
// Auto-load the default file once editor is ready
const pathInput = document.getElementById('console-file-path');
if (pathInput && pathInput.value) {
const tryLoad = () => {
if (consoleEditor) {
loadFile();
} else {
setTimeout(tryLoad, 100);
}
};
const tryLoad = () => consoleEditor ? loadFile() : setTimeout(tryLoad, 100);
tryLoad();
}
};

View File

@@ -1,4 +1,5 @@
{% from "partials/components.html" import collapse %}
{% from "partials/icons.html" import circle_check %}
{% if orphaned or migrations or not_started %}
{% call collapse("Pending Operations", id="pending-collapse", checked=expanded|default(true)) %}
{% if orphaned %}
@@ -30,7 +31,7 @@
{% endcall %}
{% else %}
<div role="alert" class="alert alert-success mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 12l2 2 4-4m6 2a9 9 0 11-18 0 9 9 0 0118 0z" /></svg>
<span class="shrink-0">{{ circle_check(24) }}</span>
<span>All services are in sync with configuration.</span>
</div>
{% endif %}

View File

@@ -31,16 +31,3 @@
{% endfor %}
</ul>
</div>
<script>
function sidebarFilter() {
const q = (document.getElementById('sidebar-filter')?.value || '').toLowerCase();
const h = document.getElementById('sidebar-host-select')?.value || '';
let n = 0;
document.querySelectorAll('#sidebar-services li').forEach(li => {
const show = (!q || li.dataset.svc.includes(q)) && (!h || !li.dataset.h || li.dataset.h === h);
li.hidden = !show;
if (show) n++;
});
document.getElementById('sidebar-count').textContent = '(' + n + ')';
}
</script>

View File

@@ -8,6 +8,7 @@ import fcntl
import json
import os
import pty
import shlex
import signal
import struct
import termios
@@ -20,6 +21,9 @@ from compose_farm.executor import is_local, ssh_connect_kwargs
from compose_farm.web.deps import get_config
from compose_farm.web.streaming import CRLF, DIM, GREEN, RED, RESET, tasks
# Shell command to prefer bash over sh
SHELL_FALLBACK = "command -v bash >/dev/null && exec bash || exec sh"
if TYPE_CHECKING:
from compose_farm.config import Host
@@ -129,12 +133,12 @@ def _make_controlling_tty(slave_fd: int) -> None:
fcntl.ioctl(slave_fd, termios.TIOCSCTTY, 0)
async def _run_local_exec(websocket: WebSocket, exec_cmd: str) -> None:
"""Run docker exec locally with PTY."""
async def _run_local_exec(websocket: WebSocket, argv: list[str]) -> None:
"""Run command locally with PTY using argv list (no shell interpretation)."""
master_fd, slave_fd = pty.openpty()
proc = await asyncio.create_subprocess_shell(
exec_cmd,
proc = await asyncio.create_subprocess_exec(
*argv,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
@@ -181,11 +185,15 @@ async def _run_exec_session(
await websocket.send_text(f"{RED}Host '{host_name}' not found{RESET}{CRLF}")
return
exec_cmd = f"docker exec -it {container} /bin/sh -c 'command -v bash >/dev/null && exec bash || exec sh'"
if is_local(host):
await _run_local_exec(websocket, exec_cmd)
# Local: use argv list (no shell interpretation)
argv = ["docker", "exec", "-it", container, "/bin/sh", "-c", SHELL_FALLBACK]
await _run_local_exec(websocket, argv)
else:
# Remote: quote container name to prevent injection
exec_cmd = (
f"docker exec -it {shlex.quote(container)} /bin/sh -c {shlex.quote(SHELL_FALLBACK)}"
)
await _run_remote_exec(websocket, host, exec_cmd)
@@ -228,7 +236,9 @@ async def _run_shell_session(
shell_cmd = "cd ~ && exec bash -i 2>/dev/null || exec sh -i"
if is_local(host):
await _run_local_exec(websocket, shell_cmd)
# Local: use argv list with shell -c to interpret the command
argv = ["/bin/sh", "-c", shell_cmd]
await _run_local_exec(websocket, argv)
else:
await _run_remote_exec(websocket, host, shell_cmd, agent_forwarding=True)

View File

@@ -1,12 +1,15 @@
"""Browser tests for HTMX behavior using Playwright.
Run with: nix-shell --run "uv run pytest tests/web/test_htmx_browser.py -v --no-cov"
Or on CI: playwright install chromium --with-deps
Run with: uv run pytest tests/web/test_htmx_browser.py -v --no-cov
CDN assets are cached locally (in .pytest_cache/vendor/) to eliminate network
variability. If a test fails with "Uncached CDN request", add the URL to CDN_ASSETS.
"""
from __future__ import annotations
import os
import re
import shutil
import socket
import threading
@@ -26,7 +29,51 @@ from compose_farm.web.routes import api as web_api
from compose_farm.web.routes import pages as web_pages
if TYPE_CHECKING:
from playwright.sync_api import Page, Route
from playwright.sync_api import Page, Route, WebSocket
# CDN assets to vendor locally for faster/more reliable tests
CDN_ASSETS = {
"https://cdn.jsdelivr.net/npm/daisyui@5": ("daisyui.css", "text/css"),
"https://cdn.jsdelivr.net/npm/@tailwindcss/browser@4": (
"tailwind.js",
"application/javascript",
),
"https://cdn.jsdelivr.net/npm/@xterm/xterm@5.5.0/css/xterm.css": ("xterm.css", "text/css"),
"https://unpkg.com/htmx.org@2.0.4": ("htmx.js", "application/javascript"),
"https://cdn.jsdelivr.net/npm/@xterm/xterm@5.5.0/lib/xterm.js": (
"xterm.js",
"application/javascript",
),
"https://cdn.jsdelivr.net/npm/@xterm/addon-fit@0.10.0/lib/addon-fit.js": (
"xterm-fit.js",
"application/javascript",
),
# Monaco editor - loader.js plus dynamically loaded modules
"https://cdn.jsdelivr.net/npm/monaco-editor@0.52.2/min/vs/loader.js": (
"monaco-loader.js",
"application/javascript",
),
"https://cdn.jsdelivr.net/npm/monaco-editor@0.52.2/min/vs/editor/editor.main.js": (
"monaco-editor-main.js",
"application/javascript",
),
"https://cdn.jsdelivr.net/npm/monaco-editor@0.52.2/min/vs/editor/editor.main.css": (
"monaco-editor-main.css",
"text/css",
),
"https://cdn.jsdelivr.net/npm/monaco-editor@0.52.2/min/vs/base/worker/workerMain.js": (
"monaco-workerMain.js",
"application/javascript",
),
"https://cdn.jsdelivr.net/npm/monaco-editor@0.52.2/min/vs/basic-languages/yaml/yaml.js": (
"monaco-yaml.js",
"application/javascript",
),
"https://cdn.jsdelivr.net/npm/monaco-editor@0.52.2/min/vs/base/browser/ui/codicons/codicon/codicon.ttf": (
"monaco-codicon.ttf",
"font/ttf",
),
}
def _browser_available() -> bool:
@@ -45,11 +92,77 @@ def _browser_available() -> bool:
return False
# Skip all tests if no browser available
pytestmark = pytest.mark.skipif(
not _browser_available(),
reason="No browser available (install via: playwright install chromium --with-deps)",
)
# Mark all tests as browser tests and skip if no browser available
pytestmark = [
pytest.mark.browser,
pytest.mark.skipif(
not _browser_available(),
reason="No browser available (install via: playwright install chromium --with-deps)",
),
]
def _download_url(url: str) -> bytes | None:
"""Download URL content using curl."""
import subprocess
try:
result = subprocess.run(
["curl", "-fsSL", "--max-time", "30", url], # noqa: S607
capture_output=True,
check=True,
)
return bytes(result.stdout)
except Exception:
return None
@pytest.fixture(scope="session")
def vendor_cache(request: pytest.FixtureRequest) -> Path:
"""Download CDN assets once and cache to disk for faster tests.
Uses a persistent cache directory so assets are only downloaded once
across multiple test runs. Clear with: pytest --cache-clear
"""
# Use project-local cache directory
cache_dir = Path(request.config.rootdir) / ".pytest_cache" / "vendor"
cache_dir.mkdir(parents=True, exist_ok=True)
for url, (filename, _content_type) in CDN_ASSETS.items():
filepath = cache_dir / filename
if filepath.exists():
continue # Already cached
content = _download_url(url)
if not content:
msg = f"Failed to download {url} - check network/curl"
raise RuntimeError(msg)
filepath.write_bytes(content)
return cache_dir
@pytest.fixture
def page(page: Page, vendor_cache: Path) -> Page:
"""Override default page fixture to intercept CDN requests with local cache.
Any CDN request not in CDN_ASSETS will abort with an error, forcing developers
to add new CDN URLs to the cache. This catches both static and dynamic loads.
"""
cache = {url: (vendor_cache / f, ct) for url, (f, ct) in CDN_ASSETS.items()}
def handle_cdn(route: Route) -> None:
url = route.request.url
for url_prefix, (filepath, content_type) in cache.items():
if url.startswith(url_prefix):
route.fulfill(status=200, content_type=content_type, body=filepath.read_bytes())
return
# Uncached CDN request - abort with helpful error
route.abort("failed")
msg = f"Uncached CDN request: {url}\n\nAdd this URL to CDN_ASSETS in tests/web/test_htmx_browser.py"
raise RuntimeError(msg)
page.route(re.compile(r"https://(cdn\.jsdelivr\.net|unpkg\.com)/.*"), handle_cdn)
return page
@pytest.fixture(scope="session")
@@ -137,14 +250,20 @@ def server_url(
thread = threading.Thread(target=server.run, daemon=True)
thread.start()
# Wait for startup
# Wait for startup with proper error handling
url = f"http://127.0.0.1:{port}"
for _ in range(50):
server_ready = False
for _ in range(100): # 2 seconds max
try:
urllib.request.urlopen(url, timeout=0.5) # noqa: S310
urllib.request.urlopen(url, timeout=0.1) # noqa: S310
server_ready = True
break
except Exception:
time.sleep(0.1)
time.sleep(0.02) # 20ms between checks
if not server_ready:
msg = f"Test server failed to start on {url}"
raise RuntimeError(msg)
yield url
@@ -303,6 +422,21 @@ class TestDashboardContent:
assert "sonarr" in content or "not started" in content
assert "jellyfin" in content or "not started" in content
def test_dashboard_monaco_loads(self, page: Page, server_url: str) -> None:
"""Dashboard page loads Monaco editor for config editing."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services", timeout=5000)
# Wait for Monaco to load
page.wait_for_function("typeof monaco !== 'undefined'", timeout=5000)
# Verify Monaco editor element exists (may be in collapsed section)
page.wait_for_function(
"document.querySelectorAll('.monaco-editor').length >= 1",
timeout=5000,
)
assert page.locator(".monaco-editor").count() >= 1
class TestSaveConfigButton:
"""Test save config button behavior."""
@@ -364,6 +498,25 @@ class TestServiceDetailPage:
# Should be back on dashboard
assert page.url.rstrip("/") == server_url.rstrip("/")
def test_service_page_monaco_loads(self, page: Page, server_url: str) -> None:
"""Service page loads Monaco editor for compose/env editing."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services a", timeout=5000)
# Navigate to plex service
page.locator("#sidebar-services a", has_text="plex").click()
page.wait_for_url("**/service/plex", timeout=5000)
# Wait for Monaco to load
page.wait_for_function("typeof monaco !== 'undefined'", timeout=5000)
# Verify Monaco editor element exists (may be in collapsed section)
page.wait_for_function(
"document.querySelectorAll('.monaco-editor').length >= 1",
timeout=5000,
)
assert page.locator(".monaco-editor").count() >= 1
class TestSidebarFilter:
"""Test JavaScript sidebar filtering functionality."""
@@ -746,7 +899,7 @@ class TestKeyboardShortcuts:
# Wait for Monaco editor to load (it takes a moment)
page.wait_for_function(
"typeof monaco !== 'undefined'",
timeout=10000,
timeout=5000,
)
# Press Ctrl+S
@@ -900,3 +1053,640 @@ class TestContentStability:
# Counts should be same (no duplicates created)
assert page.locator("#stats-cards .card").count() == initial_stat_count
assert page.locator("#sidebar-services li").count() == initial_service_count
class TestConsolePage:
"""Test console page functionality."""
def test_console_page_renders(self, page: Page, server_url: str) -> None:
"""Console page renders with all required elements."""
page.goto(f"{server_url}/console")
# Wait for page to load
page.wait_for_selector("#console-host-select", timeout=5000)
# Verify host selector exists
host_select = page.locator("#console-host-select")
assert host_select.is_visible()
# Verify Connect button exists
connect_btn = page.locator("#console-connect-btn")
assert connect_btn.is_visible()
assert "Connect" in connect_btn.inner_text()
# Verify terminal container exists
terminal_container = page.locator("#console-terminal")
assert terminal_container.is_visible()
# Verify editor container exists
editor_container = page.locator("#console-editor")
assert editor_container.is_visible()
# Verify file path input exists
file_input = page.locator("#console-file-path")
assert file_input.is_visible()
# Verify save button exists
save_btn = page.locator("#console-save-btn")
assert save_btn.is_visible()
def test_console_host_selector_shows_all_hosts(self, page: Page, server_url: str) -> None:
"""Host selector dropdown contains all configured hosts."""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-host-select", timeout=5000)
# Get all options from the dropdown
options = page.locator("#console-host-select option")
assert options.count() == 2 # server-1 and server-2 from test config
# Verify both hosts are present
option_texts = [options.nth(i).inner_text() for i in range(options.count())]
assert any("server-1" in text for text in option_texts)
assert any("server-2" in text for text in option_texts)
def test_console_connect_shows_status(self, page: Page, server_url: str) -> None:
"""Console shows connection status when attempting to connect."""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-host-select", timeout=5000)
# Wait for terminal to initialize (triggers auto-connect)
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-terminal .xterm", timeout=5000)
# Status element should exist and have some text
# (may show "Connected", "Connecting...", or "Disconnected" depending on WebSocket)
status = page.locator("#console-status")
assert status.is_visible()
status_text = status.inner_text()
# Should show some connection-related status
assert any(s in status_text for s in ["Connect", "Disconnect", "server-"])
def test_console_connect_creates_terminal_element(self, page: Page, server_url: str) -> None:
"""Connecting to a host creates xterm terminal elements.
The console page auto-connects to the first host on load,
which creates the xterm.js terminal inside the container.
"""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-terminal", timeout=5000)
# Wait for xterm.js to load from CDN
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
# The console page auto-connects, which creates the terminal.
# Wait for xterm to initialize (creates .xterm class)
page.wait_for_selector("#console-terminal .xterm", timeout=5000)
# Verify xterm elements are present
xterm_container = page.locator("#console-terminal .xterm")
assert xterm_container.is_visible()
# Verify xterm screen is created (the actual terminal display)
xterm_screen = page.locator("#console-terminal .xterm-screen")
assert xterm_screen.is_visible()
def test_console_editor_initializes(self, page: Page, server_url: str) -> None:
"""Monaco editor initializes on the console page."""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-editor", timeout=5000)
# Wait for Monaco to load from CDN
page.wait_for_function("typeof monaco !== 'undefined'", timeout=5000)
# Monaco creates elements inside the container
page.wait_for_selector("#console-editor .monaco-editor", timeout=5000)
# Verify Monaco editor is present
monaco_editor = page.locator("#console-editor .monaco-editor")
assert monaco_editor.is_visible()
def test_console_load_file_calls_api(self, page: Page, server_url: str) -> None:
"""Clicking Open button calls the file API with correct parameters."""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-file-path", timeout=5000)
# Wait for terminal to connect (sets currentHost)
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-terminal .xterm", timeout=5000)
# Track API calls
api_calls: list[str] = []
def handle_route(route: Route) -> None:
api_calls.append(route.request.url)
route.fulfill(
status=200,
content_type="application/json",
body='{"success": true, "content": "test file content"}',
)
page.route("**/api/console/file*", handle_route)
# Enter a file path and click Open
file_input = page.locator("#console-file-path")
file_input.fill("/tmp/test.yaml")
page.locator("button", has_text="Open").click()
# Wait for API call
page.wait_for_timeout(500)
# Verify API was called with correct parameters
assert len(api_calls) >= 1
assert "/api/console/file" in api_calls[0]
assert "path=" in api_calls[0]
assert "host=" in api_calls[0]
def test_console_load_file_shows_content(self, page: Page, server_url: str) -> None:
"""Loading a file displays its content in the Monaco editor."""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-file-path", timeout=5000)
# Wait for terminal to connect and Monaco to load
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-terminal .xterm", timeout=5000)
page.wait_for_function("typeof monaco !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-editor .monaco-editor", timeout=5000)
# Mock file API to return specific content
test_content = "services:\\n nginx:\\n image: nginx:latest"
def handle_route(route: Route) -> None:
route.fulfill(
status=200,
content_type="application/json",
body=f'{{"success": true, "content": "{test_content}"}}',
)
page.route("**/api/console/file*", handle_route)
# Load file
file_input = page.locator("#console-file-path")
file_input.fill("/tmp/compose.yaml")
page.locator("button", has_text="Open").click()
# Wait for content to be loaded into editor
page.wait_for_function(
"window.consoleEditor && window.consoleEditor.getValue().includes('nginx')",
timeout=5000,
)
def test_console_load_file_updates_status(self, page: Page, server_url: str) -> None:
"""Loading a file updates the editor status to show the file path."""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-file-path", timeout=5000)
# Wait for terminal and Monaco
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-terminal .xterm", timeout=5000)
page.wait_for_function("typeof monaco !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-editor .monaco-editor", timeout=5000)
# Mock file API
page.route(
"**/api/console/file*",
lambda route: route.fulfill(
status=200,
content_type="application/json",
body='{"success": true, "content": "test"}',
),
)
# Load file
file_input = page.locator("#console-file-path")
file_input.fill("/tmp/test.yaml")
page.locator("button", has_text="Open").click()
# Wait for status to show "Loaded:"
page.wait_for_function(
"document.getElementById('editor-status')?.textContent?.includes('Loaded')",
timeout=5000,
)
# Verify status shows the file path
status = page.locator("#editor-status").inner_text()
assert "Loaded" in status
assert "test.yaml" in status
def test_console_save_file_calls_api(self, page: Page, server_url: str) -> None:
"""Clicking Save button calls the file API with PUT method."""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-file-path", timeout=5000)
# Wait for terminal to connect and Monaco to load
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-terminal .xterm", timeout=5000)
page.wait_for_function("typeof monaco !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-editor .monaco-editor", timeout=5000)
# Track API calls
api_calls: list[tuple[str, str]] = [] # (method, url)
def handle_load_route(route: Route) -> None:
api_calls.append((route.request.method, route.request.url))
route.fulfill(
status=200,
content_type="application/json",
body='{"success": true, "content": "original content"}',
)
def handle_save_route(route: Route) -> None:
api_calls.append((route.request.method, route.request.url))
route.fulfill(
status=200,
content_type="application/json",
body='{"success": true}',
)
page.route(
"**/api/console/file*",
lambda route: (
handle_save_route(route)
if route.request.method == "PUT"
else handle_load_route(route)
),
)
# Load a file first (required before save works)
file_input = page.locator("#console-file-path")
file_input.fill("/tmp/test.yaml")
page.locator("button", has_text="Open").click()
page.wait_for_timeout(500)
# Clear api_calls to track only the save
api_calls.clear()
# Click Save button
page.locator("#console-save-btn").click()
page.wait_for_timeout(500)
# Verify PUT request was made
assert len(api_calls) >= 1
method, url = api_calls[0]
assert method == "PUT"
assert "/api/console/file" in url
def test_console_save_file_updates_status(self, page: Page, server_url: str) -> None:
"""Saving a file updates the editor status to show 'Saved'."""
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-file-path", timeout=5000)
# Wait for terminal and Monaco
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-terminal .xterm", timeout=5000)
page.wait_for_function("typeof monaco !== 'undefined'", timeout=5000)
page.wait_for_selector("#console-editor .monaco-editor", timeout=5000)
# Mock file API for both load and save
def handle_route(route: Route) -> None:
if route.request.method == "PUT":
route.fulfill(
status=200,
content_type="application/json",
body='{"success": true}',
)
else:
route.fulfill(
status=200,
content_type="application/json",
body='{"success": true, "content": "test"}',
)
page.route("**/api/console/file*", handle_route)
# Load file first
file_input = page.locator("#console-file-path")
file_input.fill("/tmp/test.yaml")
page.locator("button", has_text="Open").click()
page.wait_for_function(
"document.getElementById('editor-status')?.textContent?.includes('Loaded')",
timeout=5000,
)
# Save file
page.locator("#console-save-btn").click()
# Wait for status to show "Saved:"
page.wait_for_function(
"document.getElementById('editor-status')?.textContent?.includes('Saved')",
timeout=5000,
)
# Verify status shows saved
status = page.locator("#editor-status").inner_text()
assert "Saved" in status
class TestTerminalStreaming:
"""Test terminal streaming functionality for action commands."""
def test_terminal_stores_task_in_localstorage(self, page: Page, server_url: str) -> None:
"""Action response stores task ID in localStorage for reconnection."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services", timeout=5000)
# Mock Apply API to return a task ID
page.route(
"**/api/apply",
lambda route: route.fulfill(
status=200,
content_type="application/json",
body='{"task_id": "test-task-123", "service": null, "command": "apply"}',
),
)
# Clear localStorage first
page.evaluate("localStorage.clear()")
# Click Apply
page.locator("button", has_text="Apply").click()
# Wait for response to be processed
page.wait_for_timeout(500)
# Verify task ID was stored in localStorage
stored_task = page.evaluate("localStorage.getItem('cf_task:/')")
assert stored_task == "test-task-123"
def test_terminal_reconnects_from_localstorage(self, page: Page, server_url: str) -> None:
"""Terminal attempts to reconnect to task stored in localStorage.
Tests that when a page loads with an active task in localStorage,
it expands the terminal and attempts to reconnect.
"""
# First, set up a task in localStorage before navigating
page.goto(server_url)
page.wait_for_selector("#sidebar-services", timeout=5000)
# Store a task ID in localStorage
page.evaluate("localStorage.setItem('cf_task:/', 'reconnect-test-123')")
# Navigate away and back (or reload) to trigger reconnect
page.goto(f"{server_url}/console")
page.wait_for_selector("#console-terminal", timeout=5000)
# Navigate back to dashboard
page.goto(server_url)
page.wait_for_selector("#sidebar-services", timeout=5000)
# Wait for xterm to load (reconnect uses whenXtermReady)
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
# Terminal should be expanded because tryReconnectToTask runs
page.wait_for_function(
"document.getElementById('terminal-toggle')?.checked === true",
timeout=5000,
)
def test_action_triggers_terminal_websocket_connection(
self, page: Page, server_url: str
) -> None:
"""Action response with task_id triggers WebSocket connection to correct path."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services", timeout=5000)
# Track WebSocket connections
ws_urls: list[str] = []
def handle_ws(ws: WebSocket) -> None:
ws_urls.append(ws.url)
page.on("websocket", handle_ws)
# Mock Apply API to return a task ID
page.route(
"**/api/apply",
lambda route: route.fulfill(
status=200,
content_type="application/json",
body='{"task_id": "ws-test-456", "service": null, "command": "apply"}',
),
)
# Wait for xterm to load
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
# Click Apply
page.locator("button", has_text="Apply").click()
# Wait for WebSocket connection
page.wait_for_timeout(1000)
# Verify WebSocket connected to correct path
assert len(ws_urls) >= 1
assert any("/ws/terminal/ws-test-456" in url for url in ws_urls)
def test_terminal_displays_connected_message(self, page: Page, server_url: str) -> None:
"""Terminal shows [Connected] message after WebSocket opens."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services", timeout=5000)
# Mock Apply API to return a task ID
page.route(
"**/api/apply",
lambda route: route.fulfill(
status=200,
content_type="application/json",
body='{"task_id": "connected-test", "service": null, "command": "apply"}',
),
)
# Wait for xterm to load
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
# Click Apply to trigger terminal
page.locator("button", has_text="Apply").click()
# Wait for terminal to be created and WebSocket to connect
page.wait_for_selector("#terminal-output .xterm", timeout=5000)
# Check terminal buffer for [Connected] message
# xterm.js stores content in buffer, accessible via the terminal instance
has_connected = page.evaluate("""() => {
const container = document.getElementById('terminal-output');
if (!container) return false;
// Check if xterm viewport contains text (rendered content)
const viewport = container.querySelector('.xterm-rows');
if (!viewport) return false;
return viewport.textContent.includes('Connected');
}""")
assert has_connected, "Terminal should display [Connected] message"
class TestExecTerminal:
"""Test exec terminal functionality for container shells."""
def test_service_page_has_exec_terminal_container(self, page: Page, server_url: str) -> None:
"""Service page has exec terminal container (initially hidden)."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services a", timeout=5000)
# Navigate to plex service
page.locator("#sidebar-services a", has_text="plex").click()
page.wait_for_url("**/service/plex", timeout=5000)
# Exec terminal container should exist but be hidden
exec_container = page.locator("#exec-terminal-container")
assert exec_container.count() == 1
assert "hidden" in (exec_container.get_attribute("class") or "")
# The inner terminal div should also exist
exec_terminal = page.locator("#exec-terminal")
assert exec_terminal.count() == 1
def test_exec_terminal_connects_websocket(self, page: Page, server_url: str) -> None:
"""Clicking Shell button triggers WebSocket to exec endpoint."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services a", timeout=5000)
# Navigate to plex service
page.locator("#sidebar-services a", has_text="plex").click()
page.wait_for_url("**/service/plex", timeout=5000)
# Mock containers API to return a container
page.route(
"**/api/service/plex/containers*",
lambda route: route.fulfill(
status=200,
content_type="text/html",
body="""
<div class="flex items-center gap-2 p-2 bg-base-200 rounded">
<span class="status status-success"></span>
<code class="text-sm flex-1">plex-container</code>
<button class="btn btn-sm btn-outline"
onclick="initExecTerminal('plex', 'plex-container', 'server-1')">
Shell
</button>
</div>
""",
),
)
# Reload to get mocked containers
page.reload()
page.wait_for_selector("#sidebar-services", timeout=5000)
# Track WebSocket connections
ws_urls: list[str] = []
def handle_ws(ws: WebSocket) -> None:
ws_urls.append(ws.url)
page.on("websocket", handle_ws)
# Wait for xterm to load
page.wait_for_function("typeof Terminal !== 'undefined'", timeout=5000)
# Click Shell button
page.locator("button", has_text="Shell").click()
# Wait for WebSocket connection
page.wait_for_timeout(1000)
# Verify WebSocket connected to exec endpoint
assert len(ws_urls) >= 1
assert any("/ws/exec/plex/plex-container/server-1" in url for url in ws_urls)
# Exec terminal container should now be visible
exec_container = page.locator("#exec-terminal-container")
assert "hidden" not in (exec_container.get_attribute("class") or "")
class TestServicePagePalette:
"""Test command palette behavior on service pages."""
def test_service_page_palette_has_action_commands(self, page: Page, server_url: str) -> None:
"""Command palette on service page shows service-specific actions."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services a", timeout=5000)
# Navigate to plex service
page.locator("#sidebar-services a", has_text="plex").click()
page.wait_for_url("**/service/plex", timeout=5000)
# Open command palette
page.keyboard.press("Control+k")
page.wait_for_selector("#cmd-palette[open]", timeout=2000)
# Verify service-specific action commands are visible
cmd_list = page.locator("#cmd-list").inner_text()
assert "Up" in cmd_list
assert "Down" in cmd_list
assert "Restart" in cmd_list
assert "Pull" in cmd_list
assert "Update" in cmd_list
assert "Logs" in cmd_list
def test_palette_action_triggers_service_api(self, page: Page, server_url: str) -> None:
"""Selecting action from palette triggers correct service API."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services a", timeout=5000)
# Navigate to plex service
page.locator("#sidebar-services a", has_text="plex").click()
page.wait_for_url("**/service/plex", timeout=5000)
# Track API calls
api_calls: list[str] = []
def handle_route(route: Route) -> None:
api_calls.append(route.request.url)
route.fulfill(
status=200,
content_type="application/json",
body='{"task_id": "palette-test", "service": "plex", "command": "up"}',
)
page.route("**/api/service/plex/up", handle_route)
# Open command palette
page.keyboard.press("Control+k")
page.wait_for_selector("#cmd-palette[open]", timeout=2000)
# Filter to "Up" and execute
page.locator("#cmd-input").fill("Up")
page.keyboard.press("Enter")
# Wait for API call
page.wait_for_timeout(500)
# Verify correct API was called
assert len(api_calls) >= 1
assert "/api/service/plex/up" in api_calls[0]
def test_palette_apply_from_service_page(self, page: Page, server_url: str) -> None:
"""Selecting Apply from service page palette navigates to dashboard and triggers API."""
page.goto(server_url)
page.wait_for_selector("#sidebar-services a", timeout=5000)
# Navigate to plex service
page.locator("#sidebar-services a", has_text="plex").click()
page.wait_for_url("**/service/plex", timeout=5000)
# Track API calls
api_calls: list[str] = []
def handle_route(route: Route) -> None:
api_calls.append(route.request.url)
route.fulfill(
status=200,
content_type="application/json",
body='{"task_id": "apply-test", "service": null, "command": "apply"}',
)
page.route("**/api/apply", handle_route)
# Open command palette
page.keyboard.press("Control+k")
page.wait_for_selector("#cmd-palette[open]", timeout=2000)
# Filter to "Apply" and execute
page.locator("#cmd-input").fill("Apply")
page.keyboard.press("Enter")
# Wait for navigation to dashboard and API call
page.wait_for_url(server_url, timeout=5000)
page.wait_for_timeout(500)
# Verify Apply API was called
assert len(api_calls) >= 1
assert "/api/apply" in api_calls[0]

24
uv.lock generated
View File

@@ -258,6 +258,7 @@ dev = [
{ name = "pytest-asyncio" },
{ name = "pytest-cov" },
{ name = "pytest-playwright" },
{ name = "pytest-xdist" },
{ name = "ruff" },
{ name = "types-pyyaml" },
{ name = "uvicorn", extra = ["standard"] },
@@ -289,6 +290,7 @@ dev = [
{ name = "pytest-asyncio", specifier = ">=1.3.0" },
{ name = "pytest-cov", specifier = ">=6.0.0" },
{ name = "pytest-playwright", specifier = ">=0.7.0" },
{ name = "pytest-xdist", specifier = ">=3.0.0" },
{ name = "ruff", specifier = ">=0.14.8" },
{ name = "types-pyyaml", specifier = ">=6.0.12.20250915" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.27.0" },
@@ -480,6 +482,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/de/15/545e2b6cf2e3be84bc1ed85613edd75b8aea69807a71c26f4ca6a9258e82/email_validator-2.3.0-py3-none-any.whl", hash = "sha256:80f13f623413e6b197ae73bb10bf4eb0908faf509ad8362c5edeb0be7fd450b4", size = 35604, upload-time = "2025-08-26T13:09:05.858Z" },
]
[[package]]
name = "execnet"
version = "2.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/bf/89/780e11f9588d9e7128a3f87788354c7946a9cbb1401ad38a48c4db9a4f07/execnet-2.1.2.tar.gz", hash = "sha256:63d83bfdd9a23e35b9c6a3261412324f964c2ec8dcd8d3c6916ee9373e0befcd", size = 166622, upload-time = "2025-11-12T09:56:37.75Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ab/84/02fc1827e8cdded4aa65baef11296a9bbe595c474f0d6d758af082d849fd/execnet-2.1.2-py3-none-any.whl", hash = "sha256:67fba928dd5a544b783f6056f449e5e3931a5c378b128bc18501f7ea79e296ec", size = 40708, upload-time = "2025-11-12T09:56:36.333Z" },
]
[[package]]
name = "fastapi"
version = "0.125.0"
@@ -1320,6 +1331,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/76/61/4d333d8354ea2bea2c2f01bad0a4aa3c1262de20e1241f78e73360e9b620/pytest_playwright-0.7.2-py3-none-any.whl", hash = "sha256:8084e015b2b3ecff483c2160f1c8219b38b66c0d4578b23c0f700d1b0240ea38", size = 16881, upload-time = "2025-11-24T03:43:24.423Z" },
]
[[package]]
name = "pytest-xdist"
version = "3.8.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "execnet" },
{ name = "pytest" },
]
sdist = { url = "https://files.pythonhosted.org/packages/78/b4/439b179d1ff526791eb921115fca8e44e596a13efeda518b9d845a619450/pytest_xdist-3.8.0.tar.gz", hash = "sha256:7e578125ec9bc6050861aa93f2d59f1d8d085595d6551c2c90b6f4fad8d3a9f1", size = 88069, upload-time = "2025-07-01T13:30:59.346Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ca/31/d4e37e9e550c2b92a9cbc2e4d0b7420a27224968580b5a447f420847c975/pytest_xdist-3.8.0-py3-none-any.whl", hash = "sha256:202ca578cfeb7370784a8c33d6d05bc6e13b4f25b5053c30a152269fd10f0b88", size = 46396, upload-time = "2025-07-01T13:30:56.632Z" },
]
[[package]]
name = "python-dotenv"
version = "1.2.1"