diff --git a/sdk/package-lock.json b/sdk/package-lock.json index 7ccd4c3a5..c1d4a70bc 100644 --- a/sdk/package-lock.json +++ b/sdk/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/sdk", - "version": "0.3.71", + "version": "0.3.72", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/sdk", - "version": "0.3.71", + "version": "0.3.72", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.26.0", diff --git a/sdk/package.json b/sdk/package.json index 08325aa71..d94a17a68 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/sdk", - "version": "0.3.71", + "version": "0.3.72", "description": "", "main": "dist/src/index.js", "exports": { diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json index 9bbd160af..70dd02dd2 100644 --- a/sdk/types/package-lock.json +++ b/sdk/types/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/types", - "version": "0.3.65", + "version": "0.3.66", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/types", - "version": "0.3.65", + "version": "0.3.66", "license": "ISC", "devDependencies": { "@types/node": "^22.1.0", diff --git a/sdk/types/package.json b/sdk/types/package.json index 2944eb11f..30b61cd2e 100644 --- a/sdk/types/package.json +++ b/sdk/types/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/types", - "version": "0.3.65", + "version": "0.3.66", "description": "", "main": "dist/index.js", "author": "", diff --git a/sdk/types/src/types.input.ts b/sdk/types/src/types.input.ts index 56420a5ab..6714e9f9e 100644 --- a/sdk/types/src/types.input.ts +++ b/sdk/types/src/types.input.ts @@ -2614,7 +2614,24 @@ export interface ForkOptions { runtime?: string; id?: string; nativeId?: ScryptedNativeId; - labels?: string[]; + /** + * The labels used to select the cluster worker that will execute this fork. + */ + labels?: { + /** + * The worker must have all these labels. + */ + require?: string[]; + /** + * The worker must have one of these labels. + */ + any?: string[]; + /** + * The worker is preferred to have one of these labels. + * The nearest match will be selected. + */ + prefer?: string[]; + } } export interface ScryptedStatic { diff --git a/server/package-lock.json b/server/package-lock.json index b3a557faa..09c14c840 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -12,7 +12,7 @@ "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.18", - "@scrypted/types": "^0.3.65", + "@scrypted/types": "^0.3.66", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", @@ -557,9 +557,9 @@ } }, "node_modules/@scrypted/types": { - "version": "0.3.65", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.65.tgz", - "integrity": "sha512-eYvf0vrcjSd7F07ZbVUpjhqHyBd4u7zIHz89ENqv46DpCiZMag7b+OwhwMrjAVG21ekhUP7gRdPnMi+gO91ojQ==" + "version": "0.3.66", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.66.tgz", + "integrity": "sha512-POHpVgW6Ce8mnJRaXZRm+2RtvFuPP+ZehsDrhUqkQdxmnV81m8K2+3M6Vhrt+07kNDXmrznAijoj/OzXkdZWgw==" }, "node_modules/@types/adm-zip": { "version": "0.5.6", diff --git a/server/package.json b/server/package.json index 37912e2de..a51611216 100644 --- a/server/package.json +++ b/server/package.json @@ -5,7 +5,7 @@ "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.18", - "@scrypted/types": "^0.3.65", + "@scrypted/types": "^0.3.66", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 25908ef72..3d21729fd 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -442,7 +442,8 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe let runtimeWorker: RuntimeWorker; let nativeWorker: child_process.ChildProcess | worker_threads.Worker; - if (options?.labels?.length && options.runtime && !matchesClusterLabels(options, getClusterLabels())) { + // if running in a cluster, fork to a matching cluster worker only if necessary. + if (process.env.SCRYPTED_CLUSTER_ADDRESS && options?.runtime && !matchesClusterLabels(options, getClusterLabels())) { const waitKilled = new Deferred(); waitKilled.promise.finally(() => events.emit('exit')); const events = new EventEmitter(); diff --git a/server/src/scrypted-cluster.ts b/server/src/scrypted-cluster.ts index e8982e73a..d58d2200d 100644 --- a/server/src/scrypted-cluster.ts +++ b/server/src/scrypted-cluster.ts @@ -116,11 +116,30 @@ function preparePeer(socket: tls.TLSSocket, type: 'server' | 'client') { } export function matchesClusterLabels(options: ForkOptions, labels: string[]) { - for (const label of options.labels) { + let matched = 0; + for (const label of options?.labels?.require || []) { if (!labels.includes(label)) - return false; + return 0; } - return true; + + // if there is nothing in the any list, consider it matched + let foundAny = !options?.labels?.any?.length; + for (const label of options.labels?.any || []) { + if (!labels.includes(label)) { + matched++; + foundAny = true; + } + } + if (!foundAny) + return 0; + + for (const label of options?.labels?.prefer || []) { + if (labels.includes(label)) + matched++; + } + // ensure non zero result. + matched++; + return matched; } export function getClusterLabels() { @@ -175,12 +194,6 @@ export function startClusterClient(mainFilename: string) { packageJson: any, zipAPI: PluginZipAPI, zipOptions: PluginRemoteLoadZipOptions) => { - if (!options.runtime || !options.labels?.length) { - console.warn('invalid cluster fork options'); - peer.kill('invalid cluster fork options'); - throw new Error('invalid cluster fork options'); - } - let runtimeWorker: RuntimeWorker; let nativeWorker: child_process.ChildProcess | worker_threads.Worker; diff --git a/server/src/services/cluster-fork.ts b/server/src/services/cluster-fork.ts index fd0fa49fd..fdd0e6e62 100644 --- a/server/src/services/cluster-fork.ts +++ b/server/src/services/cluster-fork.ts @@ -7,7 +7,14 @@ export class ClusterFork { constructor(public runtime: ScryptedRuntime) { } async fork(peerLiveness: PeerLiveness, options: ForkOptions, packageJson: any, zipAPI: PluginZipAPI, zipOptions: PluginRemoteLoadZipOptions) { - const worker = [...this.runtime.clusterWorkers].find(worker => matchesClusterLabels); + const matchingWorkers = [...this.runtime.clusterWorkers].map(worker => ({ + worker, + matches: matchesClusterLabels(options, worker.labels), + })) + .filter(({ matches }) => matches); + matchingWorkers.sort((a, b) => b.worker.labels.length - a.worker.labels.length); + const worker = matchingWorkers[0]?.worker; + if (!worker) throw new Error(`no worker found for cluster labels ${options.labels}`);