From 53cab91b02e944bd7c28cb8b918e792dca02ae9b Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Wed, 20 Nov 2024 14:53:58 -0800 Subject: [PATCH] server: refactor runtime worker creation --- sdk/package-lock.json | 4 +- sdk/package.json | 2 +- sdk/types/package-lock.json | 4 +- sdk/types/package.json | 2 +- .../scrypted_python/scrypted_sdk/types.py | 13 ++---- sdk/types/src/types.input.ts | 2 +- server/package-lock.json | 12 ++--- server/package.json | 2 +- server/python/plugin_remote.py | 17 ++++++- server/src/plugin/plugin-host.ts | 46 ++++++++++--------- server/src/plugin/plugin-remote-worker.ts | 26 +++++++---- .../plugin/runtime/child-process-worker.ts | 4 +- .../src/plugin/runtime/cluster-fork-worker.ts | 26 +++++++---- server/src/plugin/runtime/custom-worker.ts | 6 +-- server/src/plugin/runtime/node-fork-worker.ts | 4 +- server/src/plugin/runtime/python-worker.ts | 6 +-- server/src/plugin/runtime/runtime-host.ts | 8 ++-- server/src/scrypted-cluster-main.ts | 36 ++++++--------- server/src/services/cluster-fork.ts | 10 ++-- 19 files changed, 127 insertions(+), 103 deletions(-) diff --git a/sdk/package-lock.json b/sdk/package-lock.json index bad8d2083..ac70ab78b 100644 --- a/sdk/package-lock.json +++ b/sdk/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/sdk", - "version": "0.3.86", + "version": "0.3.87", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/sdk", - "version": "0.3.86", + "version": "0.3.87", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.26.0", diff --git a/sdk/package.json b/sdk/package.json index 794ae5a3e..48114234f 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/sdk", - "version": "0.3.86", + "version": "0.3.87", "description": "", "main": "dist/src/index.js", "exports": { diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json index 8018a78f3..191c4daf4 100644 --- a/sdk/types/package-lock.json +++ b/sdk/types/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/types", - "version": "0.3.79", + "version": "0.3.80", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/types", - "version": "0.3.79", + "version": "0.3.80", "license": "ISC", "devDependencies": { "@types/node": "^22.1.0", diff --git a/sdk/types/package.json b/sdk/types/package.json index ea4fdfe42..4280600c9 100644 --- a/sdk/types/package.json +++ b/sdk/types/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/types", - "version": "0.3.79", + "version": "0.3.80", "description": "", "main": "dist/index.js", "author": "", diff --git a/sdk/types/scrypted_python/scrypted_sdk/types.py b/sdk/types/scrypted_python/scrypted_sdk/types.py index 047673e08..50ffe8e6a 100644 --- a/sdk/types/scrypted_python/scrypted_sdk/types.py +++ b/sdk/types/scrypted_python/scrypted_sdk/types.py @@ -301,13 +301,10 @@ class AudioStreamOptions(TypedDict): class ClusterFork(TypedDict): - clusterWorkerId: str # The id of the cluster worker id that will execute this fork. - filename: str # The filename to execute in the fork. Not supported in all runtimes. - id: str # The id of the device that is associated with this fork. - labels: Any # The labels used to select the cluster worker that will execute this fork. - name: str # The name of this fork. This will be used to set the thread name - nativeId: str # The native id of the mixin that is associated with this fork. - runtime: str # The runtime to use for this fork. If not specified, the current runtime will be used. + clusterWorkerId: str + id: str + labels: Any + runtime: str class HttpResponseOptions(TypedDict): @@ -950,7 +947,7 @@ class TamperState(TypedDict): pass -TYPES_VERSION = "0.3.79" +TYPES_VERSION = "0.3.80" class AirPurifier: diff --git a/sdk/types/src/types.input.ts b/sdk/types/src/types.input.ts index cc00bc5ab..10af2f201 100644 --- a/sdk/types/src/types.input.ts +++ b/sdk/types/src/types.input.ts @@ -2670,7 +2670,7 @@ export interface ForkOptions { }; } -export interface ClusterFork extends ForkOptions { +export interface ClusterFork { runtime?: ForkOptions['runtime']; labels?: ForkOptions['labels']; id?: ForkOptions['id']; diff --git a/server/package-lock.json b/server/package-lock.json index 2f9559e29..4e69118df 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -1,18 +1,18 @@ { "name": "@scrypted/server", - "version": "0.123.32", + "version": "0.123.33", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@scrypted/server", - "version": "0.123.32", + "version": "0.123.33", "hasInstallScript": true, "license": "ISC", "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.22", - "@scrypted/types": "^0.3.79", + "@scrypted/types": "^0.3.80", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", @@ -557,9 +557,9 @@ } }, "node_modules/@scrypted/types": { - "version": "0.3.79", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.79.tgz", - "integrity": "sha512-o/Rlgd+F+f9Bmlb9oSl6/qsvNwEIbNQyBGdw/O3M2BmlzUZsSjoJv5gV23SyceEHRV6NLS3anSBKu1CPjKbTRw==", + "version": "0.3.80", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.80.tgz", + "integrity": "sha512-CH98GHd5U55NRHgrYWOETGwD+BhvicBXUvQ5ENqyaedbXUZMPx3YLrHlTGiqBEGKszh5XdV7dEiL+h+bsbGDCQ==", "license": "ISC" }, "node_modules/@types/adm-zip": { diff --git a/server/package.json b/server/package.json index 6ba9568cc..181780d28 100644 --- a/server/package.json +++ b/server/package.json @@ -5,7 +5,7 @@ "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.22", - "@scrypted/types": "^0.3.79", + "@scrypted/types": "^0.3.80", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 27f082984..40c733070 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -654,7 +654,7 @@ class PluginRemote: forkMain = zipOptions and zipOptions.get("fork") debug = zipOptions.get("debug", None) plugin_volume = pv.ensure_plugin_volume(self.pluginId) - zipHash = zipOptions.get("zipHash") + zipHash: str = zipOptions.get("zipHash") plugin_zip_paths = pv.prep(plugin_volume, zipHash) if debug: @@ -824,11 +824,24 @@ class PluginRemote: if cluster_labels.needs_cluster_fork_worker(options): peerLiveness = PeerLiveness(self.loop) async def getClusterFork(): + runtimeWorkerOptions = { + "packageJson": packageJson, + "env": None, + "pluginDebug": None, + "zipFile": None, + "unzippedPath": None, + "zipHash": zipHash, + } + forkComponent = await self.api.getComponent("cluster-fork") sanitizedOptions = options.copy() sanitizedOptions["runtime"] = sanitizedOptions.get("runtime", "python") sanitizedOptions["zipHash"] = zipHash - clusterForkResult = await forkComponent.fork(peerLiveness, sanitizedOptions, packageJson, zipHash, lambda: zipAPI.getZip()) + clusterForkResult = await forkComponent.fork( + runtimeWorkerOptions, + sanitizedOptions, + peerLiveness, lambda: zipAPI.getZip() + ) async def waitPeerLiveness(): try: diff --git a/server/src/plugin/plugin-host.ts b/server/src/plugin/plugin-host.ts index ce7556c14..97398c546 100644 --- a/server/src/plugin/plugin-host.ts +++ b/server/src/plugin/plugin-host.ts @@ -25,7 +25,7 @@ import { WebSocketConnection } from './plugin-remote-websocket'; import { ensurePluginVolume, getScryptedVolume } from './plugin-volume'; import { createClusterForkWorker } from './runtime/cluster-fork-worker'; import { prepareZipSync } from './runtime/node-worker-common'; -import { RuntimeWorker } from './runtime/runtime-worker'; +import type { RuntimeWorker, RuntimeWorkerOptions } from './runtime/runtime-worker'; const serverVersion = require('../../package.json').version; @@ -341,7 +341,15 @@ export class PluginHost { if (!workerHost) throw new UnsupportedRuntimeError(this.packageJson.scrypted.runtime); - let peer: Promise + let peer: Promise; + const runtimeWorkerOptions: RuntimeWorkerOptions = { + packageJson: this.packageJson, + env, + pluginDebug, + unzippedPath: this.unzippedPath, + zipFile: this.zipFile, + zipHash: this.zipHash, + }; if (!needsClusterForkWorker(this.packageJson.scrypted)) { this.peer = new RpcPeer('host', this.pluginId, (message, reject, serializationContext) => { if (connected) { @@ -354,14 +362,7 @@ export class PluginHost { peer = Promise.resolve(this.peer); - this.worker = workerHost(this.scrypted.mainFilename, this.pluginId, { - packageJson: this.packageJson, - env, - pluginDebug, - unzippedPath: this.unzippedPath, - zipFile: this.zipFile, - zipHash: this.zipHash, - }, this.scrypted); + this.worker = workerHost(this.scrypted.mainFilename, runtimeWorkerOptions, this.scrypted); this.worker.setupRpcPeer(this.peer); @@ -379,25 +380,28 @@ export class PluginHost { }); const clusterSetup = setupCluster(this.peer); - const { runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker((async () => { - await clusterSetup.initializeCluster({ - clusterId: this.scrypted.clusterId, - clusterSecret: this.scrypted.clusterSecret, - }); - return this.scrypted.clusterFork; - })(), - this.zipHash, async () => fs.promises.readFile(this.zipFile), - this.packageJson.scrypted, this.packageJson, clusterSetup.connectRPCObject); + const { runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker( + runtimeWorkerOptions, + this.packageJson.scrypted, + (async () => { + await clusterSetup.initializeCluster({ + clusterId: this.scrypted.clusterId, + clusterSecret: this.scrypted.clusterSecret, + }); + return this.scrypted.clusterFork; + })(), + async () => fs.promises.readFile(this.zipFile), + clusterSetup.connectRPCObject); forkPeer.then(peer => { const originalPeer = this.peer; originalPeer.killedSafe.finally(() => peer.kill()); this.peer = peer; peer.killedSafe.finally(() => originalPeer.kill()); - }).catch(() => {}); + }).catch(() => { }); clusterWorkerId.then(clusterWorkerId => { console.log('cluster worker id', clusterWorkerId); - }).catch(() => {}); + }).catch(() => { }); this.worker = runtimeWorker; peer = forkPeer; diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 93afced73..00bc10308 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -21,7 +21,7 @@ import { createClusterForkWorker } from './runtime/cluster-fork-worker'; import { NodeThreadWorker } from './runtime/node-thread-worker'; import { prepareZip } from './runtime/node-worker-common'; import { getBuiltinRuntimeHosts } from './runtime/runtime-host'; -import { RuntimeWorker } from './runtime/runtime-worker'; +import { RuntimeWorker, RuntimeWorkerOptions } from './runtime/runtime-worker'; import type { ClusterForkService } from '../services/cluster-fork'; import type { PluginComponent } from '../services/plugin'; import { ClusterManagerImpl } from '../scrypted-cluster-main'; @@ -216,10 +216,23 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe let nativeWorker: child_process.ChildProcess | worker_threads.Worker; let clusterWorkerId: Promise; + const runtimeWorkerOptions: RuntimeWorkerOptions = { + packageJson, + env: undefined, + pluginDebug: undefined, + zipFile, + unzippedPath, + zipHash, + }; + // if running in a cluster, fork to a matching cluster worker only if necessary. if (needsClusterForkWorker(options)) { ({ runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker( - api.getComponent('cluster-fork'), zipHash, () => zipAPI.getZip(), options, packageJson, scrypted.connectRPCObject) + runtimeWorkerOptions, + options, + api.getComponent('cluster-fork'), + () => zipAPI.getZip(), + scrypted.connectRPCObject) ); } else { @@ -228,14 +241,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const runtime = builtins.get(options.runtime); if (!runtime) throw new Error('unknown runtime ' + options.runtime); - runtimeWorker = runtime(mainFilename, pluginId, { - packageJson, - env: undefined, - pluginDebug: undefined, - zipFile, - unzippedPath, - zipHash, - }, undefined); + runtimeWorker = runtime(mainFilename, runtimeWorkerOptions, undefined); if (runtimeWorker instanceof ChildProcessWorker) { nativeWorker = runtimeWorker.childProcess; diff --git a/server/src/plugin/runtime/child-process-worker.ts b/server/src/plugin/runtime/child-process-worker.ts index 457ff9de9..f7cf131da 100644 --- a/server/src/plugin/runtime/child-process-worker.ts +++ b/server/src/plugin/runtime/child-process-worker.ts @@ -4,14 +4,16 @@ import { RpcMessage, RpcPeer } from "../../rpc"; import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; export abstract class ChildProcessWorker extends EventEmitter implements RuntimeWorker { + public pluginId: string; protected worker: child_process.ChildProcess; get childProcess() { return this.worker; } - constructor(public pluginId: string, options: RuntimeWorkerOptions) { + constructor(options: RuntimeWorkerOptions) { super(); + this.pluginId = options.packageJson.name; } setupWorker() { diff --git a/server/src/plugin/runtime/cluster-fork-worker.ts b/server/src/plugin/runtime/cluster-fork-worker.ts index ac39ee852..32ebed2c6 100644 --- a/server/src/plugin/runtime/cluster-fork-worker.ts +++ b/server/src/plugin/runtime/cluster-fork-worker.ts @@ -5,15 +5,20 @@ import { RpcPeer } from "../../rpc"; import { PeerLiveness } from "../../scrypted-cluster-main"; import type { ClusterForkService } from "../../services/cluster-fork"; import { writeWorkerGenerator } from "../plugin-console"; -import type { RuntimeWorker } from "./runtime-worker"; +import type { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; export function createClusterForkWorker( - forkComponentPromise: Promise, - zipHash: string, - getZip: () => Promise, + runtimeWorkerOptions: RuntimeWorkerOptions, options: Partial, - packageJson: any, + forkComponentPromise: Promise, + getZip: () => Promise, connectRPCObject: (o: any) => Promise) { + + // these are specific to the cluster worker host + // and will be set there. + delete runtimeWorkerOptions.zipFile; + delete runtimeWorkerOptions.unzippedPath; + const waitKilled = new Deferred(); waitKilled.promise.finally(() => events.emit('exit')); const events = new EventEmitter(); @@ -38,21 +43,22 @@ export function createClusterForkWorker( }); const peerLiveness = new PeerLiveness(waitKilled.promise); - const clusterForkResultPromise = forkComponentPromise.then(forkComponent => forkComponent.fork(peerLiveness, { + const clusterForkResultPromise = forkComponentPromise.then(forkComponent => forkComponent.fork(runtimeWorkerOptions, { runtime: options.runtime || 'node', id: options.id, ...options, - }, packageJson, zipHash, getZip)); - clusterForkResultPromise.catch(() => {}); + }, peerLiveness, + getZip)); + clusterForkResultPromise.catch(() => { }); const clusterWorkerId = clusterForkResultPromise.then(clusterForkResult => clusterForkResult.clusterWorkerId); - clusterWorkerId.catch(() => {}); + clusterWorkerId.catch(() => { }); const forkPeer = (async () => { const clusterForkResult = await clusterForkResultPromise; waitKilled.promise.finally(() => { runtimeWorker.pid = undefined; - clusterForkResult.kill().catch(() => {}); + clusterForkResult.kill().catch(() => { }); }); clusterForkResult.waitKilled().catch(() => { }) .finally(() => { diff --git a/server/src/plugin/runtime/custom-worker.ts b/server/src/plugin/runtime/custom-worker.ts index b49484e65..501357021 100644 --- a/server/src/plugin/runtime/custom-worker.ts +++ b/server/src/plugin/runtime/custom-worker.ts @@ -11,8 +11,8 @@ export class CustomRuntimeWorker extends ChildProcessWorker { serializer: ReturnType; fork: boolean; - constructor(pluginId: string, options: RuntimeWorkerOptions, runtime: ScryptedRuntime) { - super(pluginId, options); + constructor(options: RuntimeWorkerOptions, runtime: ScryptedRuntime) { + super(options); const pluginDevice = runtime.findPluginDevice(this.pluginId); const scryptedRuntimeArguments: ScryptedRuntimeArguments = pluginDevice.state.scryptedRuntimeArguments?.value; @@ -27,7 +27,7 @@ export class CustomRuntimeWorker extends ChildProcessWorker { // stdin, stdout, stderr, peer in, peer out stdio: ['pipe', 'pipe', 'pipe', 'pipe', 'pipe', 'pipe'], env: Object.assign({}, process.env, env, { - SCRYYPTED_PLUGIN_ID: pluginId, + SCRYYPTED_PLUGIN_ID: this.pluginId, SCRYPTED_DEBUG_PORT: pluginDebug?.inspectPort?.toString(), SCRYPTED_UNZIPPED_PATH: options.unzippedPath, SCRYPTED_ZIP_FILE: options.zipFile, diff --git a/server/src/plugin/runtime/node-fork-worker.ts b/server/src/plugin/runtime/node-fork-worker.ts index 2dbcff6ca..9c5b1c7e4 100644 --- a/server/src/plugin/runtime/node-fork-worker.ts +++ b/server/src/plugin/runtime/node-fork-worker.ts @@ -28,8 +28,8 @@ export function isNodePluginChildProcess() { export class NodeForkWorker extends ChildProcessWorker { - constructor(mainFilename: string, pluginId: string, options: RuntimeWorkerOptions) { - super(pluginId, options); + constructor(mainFilename: string, options: RuntimeWorkerOptions) { + super(options); const { env, pluginDebug } = options; diff --git a/server/src/plugin/runtime/python-worker.ts b/server/src/plugin/runtime/python-worker.ts index e3d3a123b..5af12a18b 100644 --- a/server/src/plugin/runtime/python-worker.ts +++ b/server/src/plugin/runtime/python-worker.ts @@ -41,8 +41,8 @@ export class PythonRuntimeWorker extends ChildProcessWorker { return this._stderr; } - constructor(pluginId: string, options: RuntimeWorkerOptions) { - super(pluginId, options); + constructor(options: RuntimeWorkerOptions) { + super(options); const { env, pluginDebug } = options; const args: string[] = [ @@ -148,7 +148,7 @@ export class PythonRuntimeWorker extends ChildProcessWorker { }; const pyVersion = require('py/package.json').version; - const pyPath = path.join(getPluginVolume(pluginId), 'py'); + const pyPath = path.join(getPluginVolume(this.pluginId), 'py'); const portableInstallPath = path.join(pyPath, pyVersion); const py = new PortablePython(pluginPythonVersion, portableInstallPath, portablePythonOptions); diff --git a/server/src/plugin/runtime/runtime-host.ts b/server/src/plugin/runtime/runtime-host.ts index e19e544f5..5acf0eaff 100644 --- a/server/src/plugin/runtime/runtime-host.ts +++ b/server/src/plugin/runtime/runtime-host.ts @@ -4,14 +4,14 @@ import { NodeForkWorker } from "./node-fork-worker"; import { PythonRuntimeWorker } from "./python-worker"; import type { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; -export type RuntimeHost = (mainFilename: string, pluginId: string, options: RuntimeWorkerOptions, runtime: ScryptedRuntime) => RuntimeWorker; +export type RuntimeHost = (mainFilename: string, options: RuntimeWorkerOptions, runtime: ScryptedRuntime) => RuntimeWorker; export function getBuiltinRuntimeHosts() { const pluginHosts = new Map(); - pluginHosts.set('custom', (_, pluginId, options, runtime) => new CustomRuntimeWorker(pluginId, options, runtime)); - pluginHosts.set('python', (_, pluginId, options) => new PythonRuntimeWorker(pluginId, options)); - pluginHosts.set('node', (mainFilename, pluginId, options) => new NodeForkWorker(mainFilename, pluginId, options)); + pluginHosts.set('custom', (_, options, runtime) => new CustomRuntimeWorker(options, runtime)); + pluginHosts.set('python', (_, options) => new PythonRuntimeWorker(options)); + pluginHosts.set('node', (mainFilename, options) => new NodeForkWorker(mainFilename, options)); return pluginHosts; } diff --git a/server/src/scrypted-cluster-main.ts b/server/src/scrypted-cluster-main.ts index 6671ebb41..92c5a67fe 100644 --- a/server/src/scrypted-cluster-main.ts +++ b/server/src/scrypted-cluster-main.ts @@ -15,7 +15,7 @@ import type { PluginAPI } from './plugin/plugin-api'; import { getPluginVolume, getScryptedVolume } from './plugin/plugin-volume'; import { prepareZip } from './plugin/runtime/node-worker-common'; import { getBuiltinRuntimeHosts } from './plugin/runtime/runtime-host'; -import type { RuntimeWorker } from './plugin/runtime/runtime-worker'; +import type { RuntimeWorker, RuntimeWorkerOptions } from './plugin/runtime/runtime-worker'; import { RpcPeer } from './rpc'; import { createRpcDuplexSerializer } from './rpc-serializer'; import type { ScryptedRuntime } from './runtime'; @@ -103,7 +103,7 @@ export class ClusterForkResult extends PeerLiveness { } } -export type ClusterForkParam = (peerLiveness: PeerLiveness, runtime: string, packageJson: any, zipHash: string, getZip: () => Promise) => Promise; +export type ClusterForkParam = (runtime: string, options: RuntimeWorkerOptions, peerLiveness: PeerLiveness, getZip: () => Promise) => Promise; export function startClusterClient(mainFilename: string) { const originalClusterAddress = process.env.SCRYPTED_CLUSTER_ADDRESS; @@ -179,12 +179,7 @@ export function startClusterClient(mainFilename: string) { const clusterPeerSetup = setupCluster(peer); await clusterPeerSetup.initializeCluster({ clusterId, clusterSecret }); - const clusterForkParam: ClusterForkParam = async ( - peerLiveness: PeerLiveness, - runtime: string, - packageJson: any, - zipHash: string, - getZip: () => Promise) => { + const clusterForkParam: ClusterForkParam = async (runtime, runtimeWorkerOptions, peerLiveness, getZip) => { let runtimeWorker: RuntimeWorker; const builtins = getBuiltinRuntimeHosts(); @@ -192,23 +187,22 @@ export function startClusterClient(mainFilename: string) { if (!rt) throw new Error('unknown runtime ' + runtime); - const pluginId: string = packageJson.name; - const { zipFile, unzippedPath } = await prepareZip(getPluginVolume(pluginId), zipHash, getZip); + const pluginId: string = runtimeWorkerOptions.packageJson.name; + const { zipFile, unzippedPath } = await prepareZip(getPluginVolume(pluginId), runtimeWorkerOptions.zipHash, getZip); const volume = getScryptedVolume(); const pluginVolume = getPluginVolume(pluginId); - runtimeWorker = rt(mainFilename, pluginId, { - packageJson, - env: { - SCRYPTED_VOLUME: volume, - SCRYPTED_PLUGIN_VOLUME: pluginVolume, - }, - pluginDebug: undefined, - zipFile, - unzippedPath, - zipHash, - }, undefined); + runtimeWorkerOptions.zipFile = zipFile; + runtimeWorkerOptions.unzippedPath = unzippedPath; + + runtimeWorkerOptions.env = { + ...runtimeWorkerOptions.env, + SCRYPTED_VOLUME: volume, + SCRYPTED_PLUGIN_VOLUME: pluginVolume, + }; + + runtimeWorker = rt(mainFilename, runtimeWorkerOptions, undefined); runtimeWorker.stdout.on('data', data => console.log(data.toString())); runtimeWorker.stderr.on('data', data => console.error(data.toString())); diff --git a/server/src/services/cluster-fork.ts b/server/src/services/cluster-fork.ts index 0bb4c3911..8cd2c9c57 100644 --- a/server/src/services/cluster-fork.ts +++ b/server/src/services/cluster-fork.ts @@ -1,11 +1,13 @@ +import { ClusterWorker } from "@scrypted/types"; import { matchesClusterLabels } from "../cluster/cluster-labels"; import type { ScryptedRuntime } from "../runtime"; import type { ClusterForkOptions, ClusterForkParam, PeerLiveness, RunningClusterWorker } from "../scrypted-cluster-main"; +import type { RuntimeWorkerOptions } from "../plugin/runtime/runtime-worker"; export class ClusterForkService { constructor(public runtime: ScryptedRuntime) { } - async fork(peerLiveness: PeerLiveness, options: ClusterForkOptions, packageJson: any, zipHash: string, getZip: () => Promise) { + async fork(runtimeWorkerOptions: RuntimeWorkerOptions, options: ClusterForkOptions, peerLiveness: PeerLiveness, getZip: () => Promise) { const matchingWorkers = [...this.runtime.clusterWorkers.entries()].map(([id, worker]) => ({ worker, matches: matchesClusterLabels(options, worker.labels), @@ -34,8 +36,8 @@ export class ClusterForkService { } const fork: ClusterForkParam = await worker.peer.getParam('fork'); - const forkResult = await fork(peerLiveness, options.runtime, packageJson, zipHash, getZip); - options.id ||= this.runtime.findPluginDevice(packageJson.name)?._id; + const forkResult = await fork(options.runtime, runtimeWorkerOptions, peerLiveness, getZip); + options.id ||= this.runtime.findPluginDevice(runtimeWorkerOptions.packageJson.name)?._id; worker.forks.add(options); forkResult.waitKilled().catch(() => { }).finally(() => { worker.forks.delete(options); @@ -43,7 +45,7 @@ export class ClusterForkService { forkResult.clusterWorkerId = worker.id; return forkResult; - } + }; async getClusterWorkers() { const ret: any = {};