From 0bf0ec08ab7846b7ed37abe5f5e89ab5080111f8 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Fri, 15 Nov 2024 23:40:38 -0800 Subject: [PATCH] server: plugin init cleanups --- server/.vscode/launch.json | 2 +- server/package-lock.json | 4 +- server/python/plugin_remote.py | 4 +- server/src/cluster/cluster-setup.ts | 2 +- server/src/plugin/plugin-device.ts | 2 +- server/src/plugin/plugin-host.ts | 222 +++++++++++----------- server/src/plugin/plugin-lazy-remote.ts | 1 + server/src/plugin/plugin-remote-worker.ts | 6 +- server/src/rpc.ts | 2 + server/src/scrypted-cluster-main.ts | 16 +- 10 files changed, 135 insertions(+), 126 deletions(-) diff --git a/server/.vscode/launch.json b/server/.vscode/launch.json index fbf7c5f97..8ff32a128 100644 --- a/server/.vscode/launch.json +++ b/server/.vscode/launch.json @@ -96,7 +96,7 @@ "env": { "SCRYPTED_CLUSTER_LABELS": "compute", "SCRYPTED_CLUSTER_MODE": "client", - "SCRYPTED_CLUSTER_SERVER": "scrypted-nvr", + "SCRYPTED_CLUSTER_SERVER": "192.168.2.124", "SCRYPTED_CLUSTER_SECRET": "swordfish", "SCRYPTED_CAN_RESTART": "true", "SCRYPTED_VOLUME": "/Users/koush/.scrypted-cluster/volume-client", diff --git a/server/package-lock.json b/server/package-lock.json index 241591bc3..698128ef8 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/server", - "version": "0.123.23", + "version": "0.123.24", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@scrypted/server", - "version": "0.123.23", + "version": "0.123.24", "hasInstallScript": true, "license": "ISC", "dependencies": { diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index fe8e9e812..6e70316d1 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -920,8 +920,8 @@ async def plugin_async_main( clusterSetup = ClusterSetup(loop, peer) peer.params["initializeCluster"] = lambda options: clusterSetup.initializeCluster(options) - async def ping(time: int, pong: Any): - await pong(time) + async def ping(time: int): + return time peer.params["ping"] = ping diff --git a/server/src/cluster/cluster-setup.ts b/server/src/cluster/cluster-setup.ts index 3f5739e29..df1aead0e 100644 --- a/server/src/cluster/cluster-setup.ts +++ b/server/src/cluster/cluster-setup.ts @@ -345,7 +345,7 @@ export function setupCluster(peer: RpcPeer) { peer.onProxySerialization = value => onProxySerialization(peer, value, undefined); delete peer.params.initializeCluster; - peer.killed.catch(() => { }).finally(() => clusterRpcServer.close()); + peer.killedSafe.finally(() => clusterRpcServer.close()); clusterRpcServer.on('close', () => { peer.kill('cluster server closed'); // close all clusterRpcServer clients diff --git a/server/src/plugin/plugin-device.ts b/server/src/plugin/plugin-device.ts index 4ecb7a830..f528ef79d 100644 --- a/server/src/plugin/plugin-device.ts +++ b/server/src/plugin/plugin-device.ts @@ -161,7 +161,7 @@ export class PluginDeviceProxyHandler implements PrimitiveProxyHandler { console.warn('no device was returned by the plugin', this.id); } catch (e) { - console.error('error occured retrieving device from plugin', e); + console.error('error occurred retrieving device from plugin', e); } const interfaces: ScryptedInterface[] = getState(pluginDevice, ScryptedInterfaceProperty.providedInterfaces) || []; diff --git a/server/src/plugin/plugin-host.ts b/server/src/plugin/plugin-host.ts index 5d843d0cb..515194a35 100644 --- a/server/src/plugin/plugin-host.ts +++ b/server/src/plugin/plugin-host.ts @@ -170,7 +170,6 @@ export class PluginHost { return; } - const handler = this.scrypted.getDevice(pluginDevice._id); // @ts-expect-error @@ -199,8 +198,6 @@ export class PluginHost { } }) - const self = this; - const { runtime } = this.packageJson.scrypted; const mediaManager = runtime && runtime !== 'node' ? new MediaManagerHostImpl(pluginDeviceId, () => scrypted.stateManager.getSystemState(), console, id => scrypted.getDevice(id)) @@ -211,74 +208,120 @@ export class PluginHost { logger.log('i', `loading ${this.pluginName}`); logger.log('i', 'pid ' + this.worker?.pid); - const remotePromise = (async () => { - let peer: RpcPeer; - try { - peer = await peerPromise; - } - catch (e) { - logger.log('e', 'plugin failed to start ' + e); - throw new RPCResultError(this.peer, 'cluster plugin start failed', e); - } - return setupPluginRemote(peer, this.api, self.pluginId, { serverVersion }, () => this.scrypted.stateManager.getSystemState()); - })(); - - const init = (async () => { - const remote = await remotePromise; - - await Promise.all( - scrypted.findPluginDevices(self.pluginId) - .map(pluginDevice => remote.setNativeId(pluginDevice.nativeId, pluginDevice._id, pluginDevice.storage || {})) - ); - - const waitDebug = pluginDebug?.waitDebug; - if (waitDebug) { - console.info('waiting for debugger...'); - try { - await waitDebug; - console.info('debugger attached.'); - await sleep(1000); - } - catch (e) { - console.error('debugger failed', e); - } - } - - const fail = 'Plugin failed to load. View Console for more information.'; - try { - const loadZipOptions: PluginRemoteLoadZipOptions = { - clusterId: scrypted.clusterId, - clusterSecret: scrypted.clusterSecret, - // debug flag can be used to affect path resolution for sourcemaps etc. - debug: !!pluginDebug, - zipHash: this.zipHash, - }; - // original implementation sent the zipBuffer, sending the zipFile name now. - // can switch back for non-local plugins. - const modulePromise = remote.loadZip(this.packageJson, - new PluginZipAPI(async () => fs.promises.readFile(this.zipFile)), - loadZipOptions); - // allow garbage collection of the zip buffer - const module = await modulePromise; - logger.log('i', `loaded ${this.pluginName}`); - logger.clearAlert(fail) - return { module, remote }; - } - catch (e) { - logger.log('a', fail); - logger.log('e', `plugin load error ${e}`); - console.error('plugin load error', e); - throw e; - } - })(); - - this.module = init.then(({ module }) => module); - this.remote = new LazyRemote(remotePromise, init.then(({ remote }) => remote)); + const remotePromise = this.prepareRemote(peerPromise, logger, pluginDebug); + const init = this.initializeRemote(remotePromise, logger, pluginDebug); init.catch(e => { console.error('plugin failed to load', e); this.api.removeListeners(); }); + + this.module = init.then(({ module }) => module); + const remote = init.then(({ remote }) => remote); + this.remote = new LazyRemote(remotePromise, remote); + } + + private async initializeRemote(remotePromise: Promise, logger: Logger, pluginDebug: PluginDebug) { + const remote = await remotePromise; + + await Promise.all( + this.scrypted.findPluginDevices(this.pluginId) + .map(pluginDevice => remote.setNativeId(pluginDevice.nativeId, pluginDevice._id, pluginDevice.storage || {})) + ); + + const waitDebug = pluginDebug?.waitDebug; + if (waitDebug) { + console.info('waiting for debugger...'); + try { + await waitDebug; + console.info('debugger attached.'); + await sleep(1000); + } + catch (e) { + console.error('debugger failed', e); + } + } + + const fail = 'Plugin failed to load. View Console for more information.'; + try { + const loadZipOptions: PluginRemoteLoadZipOptions = { + clusterId: this.scrypted.clusterId, + clusterSecret: this.scrypted.clusterSecret, + // debug flag can be used to affect path resolution for sourcemaps etc. + debug: !!pluginDebug, + zipHash: this.zipHash, + }; + // original implementation sent the zipBuffer, sending the zipFile name now. + // can switch back for non-local plugins. + const modulePromise = remote.loadZip(this.packageJson, + new PluginZipAPI(async () => fs.promises.readFile(this.zipFile)), + loadZipOptions); + // allow garbage collection of the zip buffer + const module = await modulePromise; + logger.log('i', `loaded ${this.pluginName}`); + logger.clearAlert(fail) + return { module, remote }; + } + catch (e) { + logger.log('a', fail); + logger.log('e', `plugin load error ${e}`); + console.error('plugin load error', e); + throw e; + } + } + + private async prepareRemote(peerPromise: Promise, logger: Logger, pluginDebug: PluginDebug) { + let peer: RpcPeer; + try { + peer = await peerPromise; + } + catch (e) { + logger.log('e', 'plugin failed to start ' + e); + throw new RPCResultError(this.peer, 'cluster plugin start failed', e); + } + + const startupTime = Date.now(); + let lastPong: number; + + (async () => { + try { + let pingPromise: Promise<(time: number) => Promise> + while (!this.killed) { + await sleep(30000); + if (this.killed) + return; + pingPromise ||= peer.getParam('ping'); + const ping = await pingPromise; + lastPong = await ping(Date.now()); + } + } + catch (e) { + logger.log('e', 'plugin ping failed. restarting.'); + this.api.requestRestart(); + } + })(); + + const healthInterval = setInterval(async () => { + const now = Date.now(); + // plugin may take a while to install, so wait 10 minutes. + // after that, require 1 minute checkins. + if (!lastPong) { + if (now - startupTime > 10 * 60 * 1000) { + const logger = await this.api.getLogger(undefined); + logger.log('e', 'plugin failed to start in a timely manner. restarting.'); + this.api.requestRestart(); + } + return; + } + if (!pluginDebug && (lastPong + 60000 < now)) { + const logger = await this.api.getLogger(undefined); + logger.log('e', 'plugin is not responding to ping. restarting.'); + this.api.requestRestart(); + } + }, 60000); + peer.killedSafe.finally(() => clearInterval(healthInterval)); + + return setupPluginRemote(peer, this.api, this.pluginId, { serverVersion }, () => this.scrypted.stateManager.getSystemState()); } startPluginHost(logger: Logger, env: any, pluginDebug: PluginDebug) { @@ -347,9 +390,9 @@ export class PluginHost { forkPeer.then(peer => { const originalPeer = this.peer; - originalPeer.killed.then(s => peer.kill(s)).catch(e => peer.kill(e)); + originalPeer.killedSafe.finally(() => peer.kill()); this.peer = peer; - peer.killed.catch(() =>{} ).finally(() => originalPeer.kill()); + peer.killedSafe.finally(() => originalPeer.kill()); }).catch(() => {}); this.worker = runtimeWorker; @@ -379,49 +422,6 @@ export class PluginHost { disconnect(); }); - const startupTime = Date.now(); - let lastPong: number; - const pong = (time: number) => { - lastPong = time; - }; - (async () => { - try { - let pingPromise: Promise<(time: number, p: typeof pong) => Promise> - while (!this.killed) { - await sleep(30000); - if (this.killed) - return; - pingPromise ||= this.peer.getParam('ping'); - const ping = await pingPromise; - await ping(Date.now(), pong); - } - } - catch (e) { - logger.log('e', 'plugin ping failed. restarting.'); - this.api.requestRestart(); - } - })(); - - const healthInterval = setInterval(async () => { - const now = Date.now(); - // plugin may take a while to install, so wait 10 minutes. - // after that, require 1 minute checkins. - if (!lastPong) { - if (now - startupTime > 10 * 60 * 1000) { - const logger = await this.api.getLogger(undefined); - logger.log('e', 'plugin failed to start in a timely manner. restarting.'); - this.api.requestRestart(); - } - return; - } - if (!pluginDebug && (lastPong + 60000 < now)) { - const logger = await this.api.getLogger(undefined); - logger.log('e', 'plugin is not responding to ping. restarting.'); - this.api.requestRestart(); - } - }, 60000); - this.peer.killed.finally(() => clearInterval(healthInterval)); - return peer; } diff --git a/server/src/plugin/plugin-lazy-remote.ts b/server/src/plugin/plugin-lazy-remote.ts index 473e23587..ca890d6b1 100644 --- a/server/src/plugin/plugin-lazy-remote.ts +++ b/server/src/plugin/plugin-lazy-remote.ts @@ -15,6 +15,7 @@ import { PluginRemote, PluginRemoteLoadZipOptions, PluginZipAPI } from './plugin this.remote = await remoteReadyPromise; return this.remote; })(); + this.remoteReadyPromise.catch(() => {}); } async loadZip(packageJson: any, zipAPI: PluginZipAPI, options?: PluginRemoteLoadZipOptions): Promise { diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 1b78297d5..9b0af7209 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -32,7 +32,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const peer = new RpcPeer('unknown', 'host', peerSend); const clusterPeerSetup = setupCluster(peer); - const { initializeCluster, connectRPCObject, mainThreadBrokerRegister , mainThreadPort } = clusterPeerSetup; + const { initializeCluster, connectRPCObject, mainThreadBrokerRegister, mainThreadPort } = clusterPeerSetup; peer.params.initializeCluster = initializeCluster; @@ -195,8 +195,8 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe await installOptionalDependencies(getPluginConsole(), packageJson); - peer.params.ping = async (time: number, pong: (time: number) => Promise) => { - await pong(time); + peer.params.ping = async (time: number) => { + return time; }; const main = pluginReader(mainNodejs); diff --git a/server/src/rpc.ts b/server/src/rpc.ts index b7913f628..690d024dc 100644 --- a/server/src/rpc.ts +++ b/server/src/rpc.ts @@ -300,6 +300,7 @@ export class RpcPeer { constructorSerializerMap = new Map(); transportSafeArgumentTypes = RpcPeer.getDefaultTransportSafeArgumentTypes(); killed: Promise; + killedSafe: Promise; killedDeferred: Deferred; tags: any = {}; yieldedAsyncIterators = new Set(); @@ -395,6 +396,7 @@ export class RpcPeer { this.killed = new Promise((resolve, reject) => { this.killedDeferred = { resolve, reject, method: undefined }; }).catch(e => e.message || 'Unknown Error'); + this.killedSafe = this.killed.then(() => {}).catch(() => { }); } static isTransportSafe(value: any) { diff --git a/server/src/scrypted-cluster-main.ts b/server/src/scrypted-cluster-main.ts index ff1992104..4d33781f0 100644 --- a/server/src/scrypted-cluster-main.ts +++ b/server/src/scrypted-cluster-main.ts @@ -39,7 +39,7 @@ function peerLifecycle(serializer: ReturnType, socket.on('close', () => { peer.kill(`cluster ${type} closed`); }); - peer.killed.then(() => { + peer.killedSafe.finally(() => { socket.destroy(); }); } @@ -121,7 +121,7 @@ export function startClusterClient(mainFilename: string) { try { await once(rawSocket, 'connect'); } - catch( e) { + catch (e) { continue; } @@ -209,17 +209,19 @@ export function startClusterClient(mainFilename: string) { runtimeWorker.on('error', e => { threadPeer.kill('worker error ' + e); }); - threadPeer.killed.catch(() => { }).finally(() => { + threadPeer.killedSafe.finally(() => { runtimeWorker.kill(); }); peerLiveness.waitKilled().catch(() => { }).finally(() => { threadPeer.kill('peer killed'); }); let getRemote: any; + let ping: any; try { const initializeCluster: InitializeCluster = await threadPeer.getParam('initializeCluster'); await initializeCluster({ clusterId, clusterSecret }); getRemote = await threadPeer.getParam('getRemote'); + ping = await threadPeer.getParam('ping'); } catch (e) { threadPeer.kill('cluster fork failed'); @@ -242,6 +244,7 @@ export function startClusterClient(mainFilename: string) { stdout: readStream(runtimeWorker.stdout), stderr: readStream(runtimeWorker.stderr), getRemote, + ping, }; }; @@ -255,9 +258,12 @@ export function startClusterClient(mainFilename: string) { } catch (e) { peer.kill(e.message); - socket.destroy(); console.warn('Cluster client error:', localAddress, localPort, e); } + finally { + peer.kill(); + socket.destroy(); + } } })(); } @@ -291,7 +297,7 @@ export function createClusterServer(runtime: ScryptedRuntime, certificate: Retur forks: new Set(), }; runtime.clusterWorkers.add(worker); - peer.killed.then(() => { + peer.killedSafe.finally(() => { runtime.clusterWorkers.delete(worker); }); socket.on('close', () => {