server: limit address binding in cluster mode

This commit is contained in:
Koushik Dutta
2025-03-02 14:57:56 -08:00
parent 16a9abeb9e
commit fe1b677381
6 changed files with 102 additions and 32 deletions

View File

@@ -10,7 +10,7 @@ from typing import Any
import rpc
import rpc_reader
from typing import TypedDict
from typing import TypedDict, Callable
class ClusterObject(TypedDict):
@@ -21,6 +21,7 @@ class ClusterObject(TypedDict):
sourceKey: str
sha256: str
def isClusterAddress(address: str):
return not address or address == os.environ.get("SCRYPTED_CLUSTER_ADDRESS", None)
@@ -130,11 +131,8 @@ class ClusterSetup:
peer.kill("cluster client killed")
writer.close()
listenAddress = "0.0.0.0" if self.SCRYPTED_CLUSTER_ADDRESS else "127.0.0.1"
clusterRpcServer = await asyncio.start_server(
handleClusterClient, listenAddress, 0
)
self.clusterPort = clusterRpcServer.sockets[0].getsockname()[1]
clusterRpcServerInfo = await cluster_listen_zero(handleClusterClient)
self.clusterPort = clusterRpcServerInfo["port"]
self.peer.onProxySerialization = lambda value: self.onProxySerialization(
self.peer, value, None
)
@@ -238,3 +236,49 @@ class ClusterSetup:
return newValue
except Exception as e:
return value
class ClusterServerListener(TypedDict):
server: asyncio.Server
port: int
async def cluster_listen_zero(
callback: Callable[[asyncio.StreamReader, asyncio.StreamWriter]]
) -> ClusterServerListener:
SCRYPTED_CLUSTER_ADDRESS = os.getenv("SCRYPTED_CLUSTER_ADDRESS")
if not SCRYPTED_CLUSTER_ADDRESS:
server = await asyncio.start_server(callback, host=None, port=0)
port = server.sockets[0].getsockname()[1]
return {
"server": server,
"port": port,
}
# need to listen on the cluster address and 127.0.0.1 on the same port.
retries = 5
while retries > 0:
cluster_server = await asyncio.start_server(
callback, host=SCRYPTED_CLUSTER_ADDRESS, port=0
)
port = cluster_server.sockets[0].getsockname()[1]
try:
print('trying to bind to port', port)
local_server = await asyncio.start_server(
callback, host="127.0.0.1", port=port
)
future = asyncio.ensure_future(local_server.wait_closed())
future.add_done_callback(lambda: local_server.close())
return {
"server": cluster_server,
"port": port,
}
except:
# Port may be in use, keep trying.
cluster_server.close()
retries -= 1
raise Exception("failed to bind to cluster address.")

View File

@@ -198,6 +198,7 @@ async def createREPLServer(sdk: ScryptedStatic, plugin: ScryptedDevice) -> int:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(None)
# TODO: this should use an equivalent to cluster_listen_zero
sock.bind(("0.0.0.0" if os.getenv("SCRYPTED_CLUSTER_ADDRESS") else "127.0.0.1", 0))
sock.listen()

View File

@@ -344,7 +344,8 @@ export function setupCluster(peer: RpcPeer) {
const clients = new Set<net.Socket>();
const clusterRpcServer = net.createServer(client => {
const { server: clusterRpcServer, port } = await clusterListenZero(client => {
const clusterPeerAddress = client.remoteAddress;
const clusterPeerPort = client.remotePort;
const clusterPeerKey = getClusterPeerKey(clusterPeerAddress, clusterPeerPort);
@@ -364,11 +365,8 @@ export function setupCluster(peer: RpcPeer) {
clients.add(client);
});
const listenAddress = SCRYPTED_CLUSTER_ADDRESS
? '0.0.0.0'
: '127.0.0.1';
clusterPort = port;
clusterPort = await listenZero(clusterRpcServer, listenAddress);
peer.onProxySerialization = value => onProxySerialization(peer, value, undefined);
delete peer.params.initializeCluster;
@@ -453,3 +451,38 @@ export function getScryptedClusterMode(): ['server' | 'client', string, number]
return [mode, server, port];
}
export async function clusterListenZero(callback: (socket: net.Socket) => void) {
const SCRYPTED_CLUSTER_ADDRESS = process.env.SCRYPTED_CLUSTER_ADDRESS;
if (!SCRYPTED_CLUSTER_ADDRESS) {
const server = new net.Server(callback);
const port = await listenZero(server, '127.0.0.1');
return {
server,
port,
}
}
// need to listen on the cluster address and 127.0.0.1 on the same port.
let retries = 5;
while (retries--) {
const server = new net.Server(callback);
const port = await listenZero(server, SCRYPTED_CLUSTER_ADDRESS);
try {
const localServer = new net.Server(callback);
localServer.listen(port, '127.0.0.1');
await once(localServer, 'listening');
server.on('close', () => localServer.close());
return {
server,
port,
}
}
catch (e) {
// port may be in use, keep trying.
server.close();
}
}
throw new Error('failed to bind to cluster address.');
}

View File

@@ -1,10 +1,9 @@
import { DeviceManager, ScryptedNativeId, SystemManager } from '@scrypted/types';
import { Console } from 'console';
import { once } from 'events';
import net, { Server } from 'net';
import net from 'net';
import { PassThrough, Readable, Writable } from 'stream';
import { listenZero } from '../listen-zero';
import { isClusterAddress } from '../cluster/cluster-setup';
import { clusterListenZero } from '../cluster/cluster-setup';
export interface ConsoleServer {
pluginConsole: Console;
@@ -77,7 +76,7 @@ export function prepareConsoles(getConsoleName: () => string, systemManager: ()
ret = getConsole(async (stdout, stderr) => {
const connect = async () => {
const plugins = await getPlugins();
const [port,host] = await plugins.getRemoteServicePort(getConsoleName(), 'console-writer');
const [port, host] = await plugins.getRemoteServicePort(getConsoleName(), 'console-writer');
const socket = net.connect({
port,
host,
@@ -243,7 +242,7 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr:
const sockets = new Set<net.Socket>();
const readServer = new Server(async (socket) => {
const { server: readServer, port: readPort } = await clusterListenZero(async (socket) => {
sockets.add(socket);
let [filter] = await once(socket, 'data');
@@ -277,7 +276,7 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr:
socket.on('end', cleanup);
});
const writeServer = new Server(async (socket) => {
const { server: writeServer, port: writePort } = await clusterListenZero(async (socket) => {
sockets.add(socket);
const [data] = await once(socket, 'data');
let filter: string = data.toString();
@@ -304,12 +303,6 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr:
socket.once('end', cleanup);
});
let address = '0.0.0.0';
if (isClusterAddress(address))
address = '127.0.0.1';
const readPort = await listenZero(readServer, address);
const writePort = await listenZero(writeServer, address);
return {
clear(nativeId: ScryptedNativeId) {
const pt = outputs.get(nativeId);

View File

@@ -1,12 +1,11 @@
import { listenZero } from '../listen-zero';
import { Server } from 'net';
import { ScryptedStatic } from '@scrypted/types';
import { once } from 'events';
import repl from 'repl';
import { ScryptedStatic } from '@scrypted/types';
import { clusterListenZero } from '../cluster/cluster-setup';
export async function createREPLServer(scrypted: ScryptedStatic, params: any, plugin: any): Promise<number> {
const { deviceManager, systemManager } = scrypted;
const server = new Server(async (socket) => {
const { server, port } = await clusterListenZero(async (socket) => {
let [filter] = await once(socket, 'data');
filter = filter.toString().trim();
if (filter === 'undefined')
@@ -75,6 +74,6 @@ export async function createREPLServer(scrypted: ScryptedStatic, params: any, pl
socket.on('error', cleanup);
socket.on('end', cleanup);
});
const address = process.env.SCRYPTED_CLUSTER_ADDRESS ? '0.0.0.0' : '127.0.0.1';
return listenZero(server, address);
return port;
}

View File

@@ -42,12 +42,12 @@ export abstract class ChildProcessWorker extends EventEmitter implements Runtime
}
kill(): void {
if (!this.worker)
return;
const { worker } = this;
if (!worker)
return;
this.worker = undefined;
worker.kill();
setTimeout(() => worker.kill('SIGKILL'), 1000);
this.worker = undefined;
}
abstract send(message: RpcMessage, reject?: (e: Error) => void): void;