server: use node worker message channel so fork can create forks.

This commit is contained in:
Koushik Dutta
2024-07-30 08:40:46 -07:00
parent 8a6eaa5389
commit 05745bf3c5
3 changed files with 32 additions and 16 deletions

View File

@@ -5,8 +5,8 @@ import { RpcMessage, RpcPeer } from "../../rpc";
import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker";
export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
terminated: boolean;
worker: worker_threads.Worker;
message = new worker_threads.MessageChannel();
constructor(mainFilename: string, public pluginId: string, options: RuntimeWorkerOptions) {
super();
@@ -18,7 +18,6 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
});
this.worker.on('exit', () => {
this.terminated = true;
this.emit('exit');
});
this.worker.on('error', e => {
@@ -27,6 +26,17 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
this.worker.on('messageerror', e => {
this.emit('error', e);
});
this.message.port2.on('messageerror', e => {
this.emit('error', e);
});
this.message.port2.on('close', () => {
this.emit('error', new Error('port closed'));
});
this.worker.postMessage({
port: this.message.port1,
}, [this.message.port1]);
}
get pid() {
@@ -48,6 +58,9 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
this.worker.removeAllListeners();
this.worker.stdout.removeAllListeners();
this.worker.stderr.removeAllListeners();
this.message.port1.close();
this.message.port2.close();
this.message = undefined;
this.worker = undefined;
}
@@ -55,7 +68,7 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
try {
if (!this.worker)
throw new Error('thread worker has been killed');
this.worker.postMessage(v8.serialize(message));
this.message.port2.postMessage(v8.serialize(message));
}
catch (e) {
reject?.(e);
@@ -63,6 +76,6 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
}
setupRpcPeer(peer: RpcPeer): void {
this.worker.on('message', message => peer.handleMessage(v8.deserialize(message)));
this.message.port2.on('message', message => peer.handleMessage(v8.deserialize(message)));
}
}