diff --git a/server/package-lock.json b/server/package-lock.json index 662f30625..a0433eb61 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/server", - "version": "0.115.1", + "version": "0.115.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@scrypted/server", - "version": "0.115.1", + "version": "0.115.2", "hasInstallScript": true, "license": "ISC", "dependencies": { diff --git a/server/src/plugin/runtime/node-thread-worker.ts b/server/src/plugin/runtime/node-thread-worker.ts index c6453b109..df3281b49 100644 --- a/server/src/plugin/runtime/node-thread-worker.ts +++ b/server/src/plugin/runtime/node-thread-worker.ts @@ -5,8 +5,8 @@ import { RpcMessage, RpcPeer } from "../../rpc"; import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { - terminated: boolean; worker: worker_threads.Worker; + message = new worker_threads.MessageChannel(); constructor(mainFilename: string, public pluginId: string, options: RuntimeWorkerOptions) { super(); @@ -18,7 +18,6 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { }); this.worker.on('exit', () => { - this.terminated = true; this.emit('exit'); }); this.worker.on('error', e => { @@ -27,6 +26,17 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { this.worker.on('messageerror', e => { this.emit('error', e); }); + + this.message.port2.on('messageerror', e => { + this.emit('error', e); + }); + this.message.port2.on('close', () => { + this.emit('error', new Error('port closed')); + }); + + this.worker.postMessage({ + port: this.message.port1, + }, [this.message.port1]); } get pid() { @@ -48,6 +58,9 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { this.worker.removeAllListeners(); this.worker.stdout.removeAllListeners(); this.worker.stderr.removeAllListeners(); + this.message.port1.close(); + this.message.port2.close(); + this.message = undefined; this.worker = undefined; } @@ -55,7 +68,7 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { try { if (!this.worker) throw new Error('thread worker has been killed'); - this.worker.postMessage(v8.serialize(message)); + this.message.port2.postMessage(v8.serialize(message)); } catch (e) { reject?.(e); @@ -63,6 +76,6 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { } setupRpcPeer(peer: RpcPeer): void { - this.worker.on('message', message => peer.handleMessage(v8.deserialize(message))); + this.message.port2.on('message', message => peer.handleMessage(v8.deserialize(message))); } } diff --git a/server/src/scrypted-plugin-main.ts b/server/src/scrypted-plugin-main.ts index 0b6697426..7d948cbcb 100644 --- a/server/src/scrypted-plugin-main.ts +++ b/server/src/scrypted-plugin-main.ts @@ -12,17 +12,20 @@ function start(mainFilename: string) { module.paths.push(getPluginNodePath(pluginId)); if (process.argv[2] === 'child-thread') { - const peer = startPluginRemote(mainFilename, process.argv[3], (message, reject) => { - try { - worker_threads.parentPort.postMessage(v8.serialize(message)); - } - catch (e) { - reject?.(e); - } + worker_threads.parentPort.once('message', message => { + const { port } = message as { port: worker_threads.MessagePort }; + const peer = startPluginRemote(mainFilename, pluginId, (message, reject) => { + try { + port.postMessage(v8.serialize(message)); + } + catch (e) { + reject?.(e); + } + }); + peer.transportSafeArgumentTypes.add(Buffer.name); + peer.transportSafeArgumentTypes.add(Uint8Array.name); + port.on('message', message => peer.handleMessage(v8.deserialize(message))); }); - peer.transportSafeArgumentTypes.add(Buffer.name); - peer.transportSafeArgumentTypes.add(Uint8Array.name); - worker_threads.parentPort.on('message', message => peer.handleMessage(v8.deserialize(message))); } else { const peer = startPluginRemote(mainFilename, process.argv[3], (message, reject, serializationContext) => process.send(message, serializationContext?.sendHandle, {