From f69b93c9facca71cf2790dc716df59f30d4e0dd8 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Tue, 12 Nov 2024 20:35:51 -0800 Subject: [PATCH] server: fix consoles in clustered environment --- plugins/core/.vscode/settings.json | 2 +- plugins/core/src/plugin-socket-service.ts | 8 ++++++-- server/python/plugin_remote.py | 6 +++--- server/src/plugin/plugin-api.ts | 2 +- server/src/plugin/plugin-console.ts | 25 ++++++++++++++++------- server/src/plugin/plugin-lazy-remote.ts | 2 +- server/src/plugin/plugin-remote-worker.ts | 6 +++--- server/src/plugin/plugin-remote.ts | 2 +- server/src/plugin/plugin-repl.ts | 3 ++- server/src/runtime.ts | 9 +++++++- server/src/scrypted-cluster-common.ts | 9 ++++---- server/src/services/plugin.ts | 6 +++--- 12 files changed, 51 insertions(+), 29 deletions(-) diff --git a/plugins/core/.vscode/settings.json b/plugins/core/.vscode/settings.json index 7cad3e305..79c896063 100644 --- a/plugins/core/.vscode/settings.json +++ b/plugins/core/.vscode/settings.json @@ -1,3 +1,3 @@ { - "scrypted.debugHost": "scrypted-nvr", + "scrypted.debugHost": "127.0.0.1", } \ No newline at end of file diff --git a/plugins/core/src/plugin-socket-service.ts b/plugins/core/src/plugin-socket-service.ts index e9a6517f4..fe3072e28 100644 --- a/plugins/core/src/plugin-socket-service.ts +++ b/plugins/core/src/plugin-socket-service.ts @@ -17,9 +17,13 @@ export class PluginSocketService extends ScryptedDeviceBase implements StreamSer throw new Error('must provide pluginId'); const plugins = await sdk.systemManager.getComponent('plugins'); - const replPort: number = await plugins.getRemoteServicePort(pluginId, this.serviceName); + const servicePort = await plugins.getRemoteServicePort(pluginId, this.serviceName) as number | [number, string]; + const [port, host] = Array.isArray(servicePort) ? servicePort : [servicePort, undefined]; - const socket = net.connect(replPort); + const socket = net.connect({ + port, + host, + }); await once(socket, 'connect'); const queue = createAsyncQueue(); diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index fc52ff2dc..7354edb06 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -593,8 +593,8 @@ class PluginRemote: consoleFuture = Future() self.consoles[nativeId] = consoleFuture plugins = await self.api.getComponent("plugins") - port = await plugins.getRemoteServicePort(self.pluginId, "console-writer") - connection = await asyncio.open_connection(port=port) + port, hostname = await plugins.getRemoteServicePort(self.pluginId, "console-writer") + connection = await asyncio.open_connection(host=hostname, port=port) _, writer = connection if not nativeId: nid = "undefined" @@ -1119,7 +1119,7 @@ class PluginRemote: raise Exception("REPL unavailable: Plugin not loaded.") if self.replPort == 0: raise Exception("REPL unavailable: Python REPL not available.") - return self.replPort + return [self.replPort, os.getenv("SCRYPTED_CLUSTER_ADDRESS", None)] raise Exception(f"unknown service {name}") async def start_stats_runner(self): diff --git a/server/src/plugin/plugin-api.ts b/server/src/plugin/plugin-api.ts index 8db4e1907..998eb2fd1 100644 --- a/server/src/plugin/plugin-api.ts +++ b/server/src/plugin/plugin-api.ts @@ -190,7 +190,7 @@ export interface PluginRemote { createDeviceState(id: string, setState: (property: string, value: any) => Promise): Promise; - getServicePort(name: string, ...args: any[]): Promise; + getServicePort(name: string, ...args: any[]): Promise<[number, string]>; } export interface MediaObjectRemote extends MediaObject { diff --git a/server/src/plugin/plugin-console.ts b/server/src/plugin/plugin-console.ts index 36f17d73e..0adfa96a1 100644 --- a/server/src/plugin/plugin-console.ts +++ b/server/src/plugin/plugin-console.ts @@ -4,6 +4,7 @@ import { once } from 'events'; import net, { Server } from 'net'; import { PassThrough, Readable } from 'stream'; import { listenZero } from '../listen-zero'; +import { isClusterAddress } from '../scrypted-cluster-common'; export interface ConsoleServer { pluginConsole: Console; @@ -76,8 +77,11 @@ export function prepareConsoles(getConsoleName: () => string, systemManager: () ret = getConsole(async (stdout, stderr) => { const connect = async () => { const plugins = await getPlugins(); - const port = await plugins.getRemoteServicePort(getConsoleName(), 'console-writer'); - const socket = net.connect(port); + const [port,host] = await plugins.getRemoteServicePort(getConsoleName(), 'console-writer'); + const socket = net.connect({ + port, + host, + }); socket.write(nativeId + '\n'); const writer = (data: Buffer) => { socket.write(data); @@ -136,9 +140,12 @@ export function prepareConsoles(getConsoleName: () => string, systemManager: () if (!mixin) return; const { pluginId, nativeId: mixinNativeId } = mixin; - const port = await plugins.getRemoteServicePort(pluginId, 'console-writer'); - const socket = net.connect(port, process.env.SCRYPTED_CLUSTER_SERVER); - socket.on('error', ()=>{} ); + const [port, host] = await plugins.getRemoteServicePort(pluginId, 'console-writer'); + const socket = net.connect({ + port, + host, + }); + socket.on('error', () => { }); socket.write(mixinNativeId + '\n'); const writer = (data: Buffer) => { let str = data.toString().trim(); @@ -296,8 +303,12 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr: socket.once('error', cleanup); socket.once('end', cleanup); }); - const readPort = await listenZero(readServer, '127.0.0.1'); - const writePort = await listenZero(writeServer, '127.0.0.1'); + + let address = '0.0.0.0'; + if (isClusterAddress(address)) + address = '127.0.0.1'; + const readPort = await listenZero(readServer, address); + const writePort = await listenZero(writeServer, address); return { clear(nativeId: ScryptedNativeId) { diff --git a/server/src/plugin/plugin-lazy-remote.ts b/server/src/plugin/plugin-lazy-remote.ts index 47869d679..473e23587 100644 --- a/server/src/plugin/plugin-lazy-remote.ts +++ b/server/src/plugin/plugin-lazy-remote.ts @@ -65,7 +65,7 @@ import { PluginRemote, PluginRemoteLoadZipOptions, PluginZipAPI } from './plugin return this.remote.createDeviceState(id, setState); } - async getServicePort(name: string, ...args: any[]): Promise { + async getServicePort(name: string, ...args: any[]): Promise<[number, string]> { const remote = await this.remotePromise; return remote.getServicePort(name, ...args); } diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index fa4cd0dbe..25908ef72 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -13,7 +13,7 @@ import { RpcMessage, RpcPeer } from '../rpc'; import { evalLocal } from '../rpc-peer-eval'; import { createDuplexRpcPeer } from '../rpc-serializer'; import { getClusterLabels, matchesClusterLabels, PeerLiveness } from '../scrypted-cluster'; -import { getClusterPeerKey, prepareClusterPeer } from '../scrypted-cluster-common'; +import { getClusterPeerKey, isClusterAddress, prepareClusterPeer } from '../scrypted-cluster-common'; import type { ClusterFork } from '../services/cluster-fork'; import { MediaManagerImpl } from './media'; import { PluginAPI, PluginAPIProxy, PluginRemote, PluginRemoteLoadZipOptions, PluginZipAPI } from './plugin-api'; @@ -41,7 +41,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const peer = new RpcPeer('unknown', 'host', peerSend); const clusterPeerSetup = prepareClusterPeer(peer, startPluginRemoteOptions?.onClusterPeer); - const { initializeCluster, isClusterAddress, clusterPeers, onProxySerialization, SCRYPTED_CLUSTER_ADDRESS, connectRPCObject } = clusterPeerSetup; + const { initializeCluster, clusterPeers, onProxySerialization, SCRYPTED_CLUSTER_ADDRESS, connectRPCObject } = clusterPeerSetup; peer.params.initializeCluster = initializeCluster; @@ -103,7 +103,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe if (name === 'repl') { if (!replPort) throw new Error('REPL unavailable: Plugin not loaded.') - return replPort; + return [await replPort, process.env.SCRYPTED_CLUSTER_ADDRESS]; } throw new Error(`unknown service ${name}`); }, diff --git a/server/src/plugin/plugin-remote.ts b/server/src/plugin/plugin-remote.ts index 188abffcf..bd81f1f32 100644 --- a/server/src/plugin/plugin-remote.ts +++ b/server/src/plugin/plugin-remote.ts @@ -456,7 +456,7 @@ export interface WebSocketCustomHandler { export interface PluginRemoteAttachOptions { createMediaManager?: (systemManager: SystemManager, deviceManager: DeviceManagerImpl) => Promise; - getServicePort?: (name: string, ...args: any[]) => Promise; + getServicePort?: (name: string, ...args: any[]) => Promise<[number, string]>; getDeviceConsole?: (nativeId?: ScryptedNativeId) => Console; getPluginConsole?: () => Console; getMixinConsole?: (id: string, nativeId?: ScryptedNativeId) => Console; diff --git a/server/src/plugin/plugin-repl.ts b/server/src/plugin/plugin-repl.ts index e960b0307..a8335039a 100644 --- a/server/src/plugin/plugin-repl.ts +++ b/server/src/plugin/plugin-repl.ts @@ -75,5 +75,6 @@ export async function createREPLServer(scrypted: ScryptedStatic, params: any, pl socket.on('error', cleanup); socket.on('end', cleanup); }); - return listenZero(server, '127.0.0.1'); + const address = process.env.SCRYPTED_CLUSTER_ADDRESS ? '0.0.0.0' : '127.0.0.1'; + return listenZero(server, address); } diff --git a/server/src/runtime.ts b/server/src/runtime.ts index c2e4dec74..35af2a897 100644 --- a/server/src/runtime.ts +++ b/server/src/runtime.ts @@ -45,6 +45,7 @@ import { getNpmPackageInfo, PluginComponent } from './services/plugin'; import { ServiceControl } from './services/service-control'; import { UsersService } from './services/users'; import { getState, ScryptedStateManager, setState } from './state'; +import { isClusterAddress } from './scrypted-cluster-common'; interface DeviceProxyPair { handler: PluginDeviceProxyHandler; @@ -123,7 +124,13 @@ export class ScryptedRuntime extends PluginHttp { return; } - const socket = net.connect(clusterObject.port, '127.0.0.1'); + let address = clusterObject.address; + if (isClusterAddress(address)) + address = '127.0.0.1'; + const socket = net.connect({ + port: clusterObject.port, + host: address, + }); socket.on('error', () => connection.close()); socket.on('close', () => connection.close()); socket.on('data', data => connection.send(data)); diff --git a/server/src/scrypted-cluster-common.ts b/server/src/scrypted-cluster-common.ts index 524467a52..df2bb3474 100644 --- a/server/src/scrypted-cluster-common.ts +++ b/server/src/scrypted-cluster-common.ts @@ -11,6 +11,10 @@ export function getClusterPeerKey(address: string, port: number) { return `${address}:${port}`; } +export function isClusterAddress(address: string) { + return !address || address === process.env.SCRYPTED_CLUSTER_ADDRESS; +} + export function prepareClusterPeer(peer: RpcPeer, onClusterPeer?: (clusterPeer: RpcPeer) => void) { const SCRYPTED_CLUSTER_ADDRESS = process.env.SCRYPTED_CLUSTER_ADDRESS; let clusterId: string; @@ -44,10 +48,6 @@ export function prepareClusterPeer(peer: RpcPeer, onClusterPeer?: (clusterPeer: return resolveObject(o.proxyId, o.sourceKey); } - function isClusterAddress(address: string) { - return !address || address === SCRYPTED_CLUSTER_ADDRESS; - } - const onProxySerialization = (peer: RpcPeer, value: any, sourceKey: string) => { const properties = RpcPeer.prepareProxyProperties(value) || {}; let clusterEntry: ClusterObject = properties.__cluster; @@ -124,7 +124,6 @@ export function prepareClusterPeer(peer: RpcPeer, onClusterPeer?: (clusterPeer: return clusterPort; }, SCRYPTED_CLUSTER_ADDRESS, - isClusterAddress, clusterPeers, onProxySerialization, connectRPCObject, diff --git a/server/src/services/plugin.ts b/server/src/services/plugin.ts index 2940252c5..6e1671157 100644 --- a/server/src/services/plugin.ts +++ b/server/src/services/plugin.ts @@ -174,14 +174,14 @@ export class PluginComponent { consoleServer.clear(pluginDevice.nativeId); } - async getRemoteServicePort(pluginId: string, name: string, ...args: any[]): Promise { + async getRemoteServicePort(pluginId: string, name: string, ...args: any[]): Promise<[number, string]> { if (name === 'console') { const consoleServer = await this.scrypted.plugins[pluginId].consoleServer; - return consoleServer.readPort; + return [consoleServer.readPort, process.env.SCRYPTED_CLUSTER_ADDRESS]; } if (name === 'console-writer') { const consoleServer = await this.scrypted.plugins[pluginId].consoleServer; - return consoleServer.writePort; + return [consoleServer.writePort, process.env.SCRYPTED_CLUSTER_ADDRESS]; } return this.scrypted.plugins[pluginId].remote.getServicePort(name, ...args);