server/sdk: new cluster label format

This commit is contained in:
Koushik Dutta
2024-11-13 09:23:39 -08:00
parent 5ff8a65c4a
commit 5a62fdc06b
10 changed files with 61 additions and 23 deletions

View File

@@ -12,7 +12,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.18",
"@scrypted/types": "^0.3.65",
"@scrypted/types": "^0.3.66",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",
@@ -557,9 +557,9 @@
}
},
"node_modules/@scrypted/types": {
"version": "0.3.65",
"resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.65.tgz",
"integrity": "sha512-eYvf0vrcjSd7F07ZbVUpjhqHyBd4u7zIHz89ENqv46DpCiZMag7b+OwhwMrjAVG21ekhUP7gRdPnMi+gO91ojQ=="
"version": "0.3.66",
"resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.66.tgz",
"integrity": "sha512-POHpVgW6Ce8mnJRaXZRm+2RtvFuPP+ZehsDrhUqkQdxmnV81m8K2+3M6Vhrt+07kNDXmrznAijoj/OzXkdZWgw=="
},
"node_modules/@types/adm-zip": {
"version": "0.5.6",

View File

@@ -5,7 +5,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.18",
"@scrypted/types": "^0.3.65",
"@scrypted/types": "^0.3.66",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",

View File

@@ -442,7 +442,8 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
let runtimeWorker: RuntimeWorker;
let nativeWorker: child_process.ChildProcess | worker_threads.Worker;
if (options?.labels?.length && options.runtime && !matchesClusterLabels(options, getClusterLabels())) {
// if running in a cluster, fork to a matching cluster worker only if necessary.
if (process.env.SCRYPTED_CLUSTER_ADDRESS && options?.runtime && !matchesClusterLabels(options, getClusterLabels())) {
const waitKilled = new Deferred<void>();
waitKilled.promise.finally(() => events.emit('exit'));
const events = new EventEmitter();

View File

@@ -116,11 +116,30 @@ function preparePeer(socket: tls.TLSSocket, type: 'server' | 'client') {
}
export function matchesClusterLabels(options: ForkOptions, labels: string[]) {
for (const label of options.labels) {
let matched = 0;
for (const label of options?.labels?.require || []) {
if (!labels.includes(label))
return false;
return 0;
}
return true;
// if there is nothing in the any list, consider it matched
let foundAny = !options?.labels?.any?.length;
for (const label of options.labels?.any || []) {
if (!labels.includes(label)) {
matched++;
foundAny = true;
}
}
if (!foundAny)
return 0;
for (const label of options?.labels?.prefer || []) {
if (labels.includes(label))
matched++;
}
// ensure non zero result.
matched++;
return matched;
}
export function getClusterLabels() {
@@ -175,12 +194,6 @@ export function startClusterClient(mainFilename: string) {
packageJson: any,
zipAPI: PluginZipAPI,
zipOptions: PluginRemoteLoadZipOptions) => {
if (!options.runtime || !options.labels?.length) {
console.warn('invalid cluster fork options');
peer.kill('invalid cluster fork options');
throw new Error('invalid cluster fork options');
}
let runtimeWorker: RuntimeWorker;
let nativeWorker: child_process.ChildProcess | worker_threads.Worker;

View File

@@ -7,7 +7,14 @@ export class ClusterFork {
constructor(public runtime: ScryptedRuntime) { }
async fork(peerLiveness: PeerLiveness, options: ForkOptions, packageJson: any, zipAPI: PluginZipAPI, zipOptions: PluginRemoteLoadZipOptions) {
const worker = [...this.runtime.clusterWorkers].find(worker => matchesClusterLabels);
const matchingWorkers = [...this.runtime.clusterWorkers].map(worker => ({
worker,
matches: matchesClusterLabels(options, worker.labels),
}))
.filter(({ matches }) => matches);
matchingWorkers.sort((a, b) => b.worker.labels.length - a.worker.labels.length);
const worker = matchingWorkers[0]?.worker;
if (!worker)
throw new Error(`no worker found for cluster labels ${options.labels}`);