diff --git a/server/python/cluster_setup.py b/server/python/cluster_setup.py index 66323e192..b516b295f 100644 --- a/server/python/cluster_setup.py +++ b/server/python/cluster_setup.py @@ -10,7 +10,7 @@ from typing import Any import rpc import rpc_reader -from typing import TypedDict +from typing import TypedDict, Callable class ClusterObject(TypedDict): @@ -21,6 +21,7 @@ class ClusterObject(TypedDict): sourceKey: str sha256: str + def isClusterAddress(address: str): return not address or address == os.environ.get("SCRYPTED_CLUSTER_ADDRESS", None) @@ -130,11 +131,8 @@ class ClusterSetup: peer.kill("cluster client killed") writer.close() - listenAddress = "0.0.0.0" if self.SCRYPTED_CLUSTER_ADDRESS else "127.0.0.1" - clusterRpcServer = await asyncio.start_server( - handleClusterClient, listenAddress, 0 - ) - self.clusterPort = clusterRpcServer.sockets[0].getsockname()[1] + clusterRpcServerInfo = await cluster_listen_zero(handleClusterClient) + self.clusterPort = clusterRpcServerInfo["port"] self.peer.onProxySerialization = lambda value: self.onProxySerialization( self.peer, value, None ) @@ -238,3 +236,49 @@ class ClusterSetup: return newValue except Exception as e: return value + + +class ClusterServerListener(TypedDict): + server: asyncio.Server + port: int + + +async def cluster_listen_zero( + callback: Callable[[asyncio.StreamReader, asyncio.StreamWriter]] +) -> ClusterServerListener: + SCRYPTED_CLUSTER_ADDRESS = os.getenv("SCRYPTED_CLUSTER_ADDRESS") + if not SCRYPTED_CLUSTER_ADDRESS: + server = await asyncio.start_server(callback, host=None, port=0) + port = server.sockets[0].getsockname()[1] + return { + "server": server, + "port": port, + } + + # need to listen on the cluster address and 127.0.0.1 on the same port. + retries = 5 + while retries > 0: + cluster_server = await asyncio.start_server( + callback, host=SCRYPTED_CLUSTER_ADDRESS, port=0 + ) + port = cluster_server.sockets[0].getsockname()[1] + + try: + print('trying to bind to port', port) + local_server = await asyncio.start_server( + callback, host="127.0.0.1", port=port + ) + + future = asyncio.ensure_future(local_server.wait_closed()) + future.add_done_callback(lambda: local_server.close()) + + return { + "server": cluster_server, + "port": port, + } + except: + # Port may be in use, keep trying. + cluster_server.close() + retries -= 1 + + raise Exception("failed to bind to cluster address.") diff --git a/server/python/plugin_repl.py b/server/python/plugin_repl.py index 365ae6759..492e05214 100644 --- a/server/python/plugin_repl.py +++ b/server/python/plugin_repl.py @@ -198,6 +198,7 @@ async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(None) + # TODO: this should use an equivalent to cluster_listen_zero sock.bind(("0.0.0.0" if os.getenv("SCRYPTED_CLUSTER_ADDRESS") else "127.0.0.1", 0)) sock.listen() diff --git a/server/src/cluster/cluster-setup.ts b/server/src/cluster/cluster-setup.ts index 4a9517273..64d767635 100644 --- a/server/src/cluster/cluster-setup.ts +++ b/server/src/cluster/cluster-setup.ts @@ -344,7 +344,8 @@ export function setupCluster(peer: RpcPeer) { const clients = new Set(); - const clusterRpcServer = net.createServer(client => { + + const { server: clusterRpcServer, port } = await clusterListenZero(client => { const clusterPeerAddress = client.remoteAddress; const clusterPeerPort = client.remotePort; const clusterPeerKey = getClusterPeerKey(clusterPeerAddress, clusterPeerPort); @@ -364,11 +365,8 @@ export function setupCluster(peer: RpcPeer) { clients.add(client); }); - const listenAddress = SCRYPTED_CLUSTER_ADDRESS - ? '0.0.0.0' - : '127.0.0.1'; + clusterPort = port; - clusterPort = await listenZero(clusterRpcServer, listenAddress); peer.onProxySerialization = value => onProxySerialization(peer, value, undefined); delete peer.params.initializeCluster; @@ -453,3 +451,38 @@ export function getScryptedClusterMode(): ['server' | 'client', string, number] return [mode, server, port]; } + +export async function clusterListenZero(callback: (socket: net.Socket) => void) { + const SCRYPTED_CLUSTER_ADDRESS = process.env.SCRYPTED_CLUSTER_ADDRESS; + if (!SCRYPTED_CLUSTER_ADDRESS) { + const server = new net.Server(callback); + const port = await listenZero(server, '127.0.0.1'); + return { + server, + port, + } + } + + // need to listen on the cluster address and 127.0.0.1 on the same port. + let retries = 5; + while (retries--) { + const server = new net.Server(callback); + const port = await listenZero(server, SCRYPTED_CLUSTER_ADDRESS); + try { + const localServer = new net.Server(callback); + localServer.listen(port, '127.0.0.1'); + await once(localServer, 'listening'); + server.on('close', () => localServer.close()); + return { + server, + port, + } + } + catch (e) { + // port may be in use, keep trying. + server.close(); + } + } + + throw new Error('failed to bind to cluster address.'); +} diff --git a/server/src/plugin/plugin-console.ts b/server/src/plugin/plugin-console.ts index 2b4bc831c..3200695bd 100644 --- a/server/src/plugin/plugin-console.ts +++ b/server/src/plugin/plugin-console.ts @@ -1,10 +1,9 @@ import { DeviceManager, ScryptedNativeId, SystemManager } from '@scrypted/types'; import { Console } from 'console'; import { once } from 'events'; -import net, { Server } from 'net'; +import net from 'net'; import { PassThrough, Readable, Writable } from 'stream'; -import { listenZero } from '../listen-zero'; -import { isClusterAddress } from '../cluster/cluster-setup'; +import { clusterListenZero } from '../cluster/cluster-setup'; export interface ConsoleServer { pluginConsole: Console; @@ -77,7 +76,7 @@ export function prepareConsoles(getConsoleName: () => string, systemManager: () ret = getConsole(async (stdout, stderr) => { const connect = async () => { const plugins = await getPlugins(); - const [port,host] = await plugins.getRemoteServicePort(getConsoleName(), 'console-writer'); + const [port, host] = await plugins.getRemoteServicePort(getConsoleName(), 'console-writer'); const socket = net.connect({ port, host, @@ -243,7 +242,7 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr: const sockets = new Set(); - const readServer = new Server(async (socket) => { + const { server: readServer, port: readPort } = await clusterListenZero(async (socket) => { sockets.add(socket); let [filter] = await once(socket, 'data'); @@ -277,7 +276,7 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr: socket.on('end', cleanup); }); - const writeServer = new Server(async (socket) => { + const { server: writeServer, port: writePort } = await clusterListenZero(async (socket) => { sockets.add(socket); const [data] = await once(socket, 'data'); let filter: string = data.toString(); @@ -304,12 +303,6 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr: socket.once('end', cleanup); }); - let address = '0.0.0.0'; - if (isClusterAddress(address)) - address = '127.0.0.1'; - const readPort = await listenZero(readServer, address); - const writePort = await listenZero(writeServer, address); - return { clear(nativeId: ScryptedNativeId) { const pt = outputs.get(nativeId); diff --git a/server/src/plugin/plugin-repl.ts b/server/src/plugin/plugin-repl.ts index a8335039a..f82197428 100644 --- a/server/src/plugin/plugin-repl.ts +++ b/server/src/plugin/plugin-repl.ts @@ -1,12 +1,11 @@ -import { listenZero } from '../listen-zero'; -import { Server } from 'net'; +import { ScryptedStatic } from '@scrypted/types'; import { once } from 'events'; import repl from 'repl'; -import { ScryptedStatic } from '@scrypted/types'; +import { clusterListenZero } from '../cluster/cluster-setup'; export async function createREPLServer(scrypted: ScryptedStatic, params: any, plugin: any): Promise { const { deviceManager, systemManager } = scrypted; - const server = new Server(async (socket) => { + const { server, port } = await clusterListenZero(async (socket) => { let [filter] = await once(socket, 'data'); filter = filter.toString().trim(); if (filter === 'undefined') @@ -75,6 +74,6 @@ export async function createREPLServer(scrypted: ScryptedStatic, params: any, pl socket.on('error', cleanup); socket.on('end', cleanup); }); - const address = process.env.SCRYPTED_CLUSTER_ADDRESS ? '0.0.0.0' : '127.0.0.1'; - return listenZero(server, address); + + return port; } diff --git a/server/src/plugin/runtime/child-process-worker.ts b/server/src/plugin/runtime/child-process-worker.ts index 90f3a95fe..1ddcac1c6 100644 --- a/server/src/plugin/runtime/child-process-worker.ts +++ b/server/src/plugin/runtime/child-process-worker.ts @@ -42,12 +42,12 @@ export abstract class ChildProcessWorker extends EventEmitter implements Runtime } kill(): void { - if (!this.worker) - return; const { worker } = this; + if (!worker) + return; + this.worker = undefined; worker.kill(); setTimeout(() => worker.kill('SIGKILL'), 1000); - this.worker = undefined; } abstract send(message: RpcMessage, reject?: (e: Error) => void): void;