common: prevent runaway zygote

This commit is contained in:
Koushik Dutta
2023-11-19 19:03:43 -08:00
parent 9b168bb012
commit 167360a218

View File

@@ -1,6 +1,7 @@
import sdk, { PluginFork } from '@scrypted/sdk';
import worker_threads from 'worker_threads';
import { createAsyncQueue } from './async-queue';
import os from 'os';
export type Zygote<T> = () => PluginFork<T>;
@@ -22,7 +23,7 @@ export function createZygote<T>(): Zygote<T> {
}
export function createZygoteWorkQueue<T>() {
export function createZygoteWorkQueue<T>(maxWorkers: number = os.cpus().length >> 1) {
const queue = createAsyncQueue<(doWork: (fork: PluginFork<T>) => Promise<any>) => Promise<any>>();
let forks = 0;
@@ -31,42 +32,44 @@ export function createZygoteWorkQueue<T>() {
if (check)
return check(doWork);
let exited = false;
const controller = new AbortController();
// necessary to prevent unhandledrejection errors
controller.signal.addEventListener('abort', () => {});
const fork = sdk.fork<T>();
forks++;
fork.worker.on('exit', () => {
forks--;
exited = true;
controller.abort();
});
if (maxWorkers && forks < maxWorkers) {
let exited = false;
const controller = new AbortController();
// necessary to prevent unhandledrejection errors
controller.signal.addEventListener('abort', () => { });
const fork = sdk.fork<T>();
forks++;
fork.worker.on('exit', () => {
forks--;
exited = true;
controller.abort();
});
let timeout: NodeJS.Timeout;
const queueFork = () => {
clearTimeout(timeout);
timeout = setTimeout(() => {
// keep one alive.
if (forks === 1)
return;
fork.worker.terminate();
}, 30000);
queue.submit(async v2 => {
let timeout: NodeJS.Timeout;
const queueFork = () => {
clearTimeout(timeout);
try {
return await v2(fork);
}
finally {
if (!exited) {
queueFork();
}
}
}, controller.signal);
}
timeout = setTimeout(() => {
// keep one alive.
if (forks === 1)
return;
fork.worker.terminate();
}, 30000);
queueFork();
queue.submit(async v2 => {
clearTimeout(timeout);
try {
return await v2(fork);
}
finally {
if (!exited) {
queueFork();
}
}
}, controller.signal);
}
queueFork();
}
const d = await queue.dequeue();
return d(doWork);