server: cleanup message port references and errors

This commit is contained in:
Koushik Dutta
2024-07-30 08:49:21 -07:00
parent 05745bf3c5
commit 237b3ce0d9
2 changed files with 20 additions and 10 deletions

View File

@@ -6,7 +6,7 @@ import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker";
export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
worker: worker_threads.Worker;
message = new worker_threads.MessageChannel();
port: worker_threads.MessagePort;
constructor(mainFilename: string, public pluginId: string, options: RuntimeWorkerOptions) {
super();
@@ -27,16 +27,19 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
this.emit('error', e);
});
this.message.port2.on('messageerror', e => {
const message = new worker_threads.MessageChannel();
const { port1, port2 } = message;
this.port = port2;
this.port.on('messageerror', e => {
this.emit('error', e);
});
this.message.port2.on('close', () => {
this.port.on('close', () => {
this.emit('error', new Error('port closed'));
});
this.worker.postMessage({
port: this.message.port1,
}, [this.message.port1]);
port: port1,
}, [port1]);
}
get pid() {
@@ -58,9 +61,8 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
this.worker.removeAllListeners();
this.worker.stdout.removeAllListeners();
this.worker.stderr.removeAllListeners();
this.message.port1.close();
this.message.port2.close();
this.message = undefined;
this.port.close();
this.port = undefined;
this.worker = undefined;
}
@@ -68,7 +70,7 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
try {
if (!this.worker)
throw new Error('thread worker has been killed');
this.message.port2.postMessage(v8.serialize(message));
this.port.postMessage(v8.serialize(message));
}
catch (e) {
reject?.(e);
@@ -76,6 +78,6 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
}
setupRpcPeer(peer: RpcPeer): void {
this.message.port2.on('message', message => peer.handleMessage(v8.deserialize(message)));
this.port.on('message', message => peer.handleMessage(v8.deserialize(message)));
}
}