diff --git a/common/src/zygote.ts b/common/src/zygote.ts index b8fb0f3b0..3f702917a 100644 --- a/common/src/zygote.ts +++ b/common/src/zygote.ts @@ -1,6 +1,7 @@ import sdk, { PluginFork } from '@scrypted/sdk'; import worker_threads from 'worker_threads'; import { createAsyncQueue } from './async-queue'; +import os from 'os'; export type Zygote = () => PluginFork; @@ -22,7 +23,7 @@ export function createZygote(): Zygote { } -export function createZygoteWorkQueue() { +export function createZygoteWorkQueue(maxWorkers: number = os.cpus().length >> 1) { const queue = createAsyncQueue<(doWork: (fork: PluginFork) => Promise) => Promise>(); let forks = 0; @@ -31,42 +32,44 @@ export function createZygoteWorkQueue() { if (check) return check(doWork); - let exited = false; - const controller = new AbortController(); - // necessary to prevent unhandledrejection errors - controller.signal.addEventListener('abort', () => {}); - const fork = sdk.fork(); - forks++; - fork.worker.on('exit', () => { - forks--; - exited = true; - controller.abort(); - }); + if (maxWorkers && forks < maxWorkers) { + let exited = false; + const controller = new AbortController(); + // necessary to prevent unhandledrejection errors + controller.signal.addEventListener('abort', () => { }); + const fork = sdk.fork(); + forks++; + fork.worker.on('exit', () => { + forks--; + exited = true; + controller.abort(); + }); - let timeout: NodeJS.Timeout; - const queueFork = () => { - clearTimeout(timeout); - timeout = setTimeout(() => { - // keep one alive. - if (forks === 1) - return; - fork.worker.terminate(); - }, 30000); - - queue.submit(async v2 => { + let timeout: NodeJS.Timeout; + const queueFork = () => { clearTimeout(timeout); - try { - return await v2(fork); - } - finally { - if (!exited) { - queueFork(); - } - } - }, controller.signal); - } + timeout = setTimeout(() => { + // keep one alive. + if (forks === 1) + return; + fork.worker.terminate(); + }, 30000); - queueFork(); + queue.submit(async v2 => { + clearTimeout(timeout); + try { + return await v2(fork); + } + finally { + if (!exited) { + queueFork(); + } + } + }, controller.signal); + } + + queueFork(); + } const d = await queue.dequeue(); return d(doWork);