From 575e544c40c3249d1993ce6fe210fd9b1ca3397d Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Mon, 9 Dec 2024 21:45:24 -0800 Subject: [PATCH] core: Fix nre if clusterManager does not exist --- plugins/core/package-lock.json | 4 +- plugins/core/package.json | 2 +- plugins/core/src/cluster.ts | 160 +++++++++++++++++++++++++++++++++ plugins/core/src/main.ts | 2 +- 4 files changed, 164 insertions(+), 4 deletions(-) create mode 100644 plugins/core/src/cluster.ts diff --git a/plugins/core/package-lock.json b/plugins/core/package-lock.json index fb960bcbe..7660c8613 100644 --- a/plugins/core/package-lock.json +++ b/plugins/core/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/core", - "version": "0.3.87", + "version": "0.3.89", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/core", - "version": "0.3.87", + "version": "0.3.89", "license": "Apache-2.0", "dependencies": { "@scrypted/common": "file:../../common", diff --git a/plugins/core/package.json b/plugins/core/package.json index dc1c98ea9..5a555ba62 100644 --- a/plugins/core/package.json +++ b/plugins/core/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/core", - "version": "0.3.87", + "version": "0.3.89", "description": "Scrypted Core plugin. Provides the UI, websocket, and engine.io APIs.", "author": "Scrypted", "license": "Apache-2.0", diff --git a/plugins/core/src/cluster.ts b/plugins/core/src/cluster.ts new file mode 100644 index 000000000..b3fed56ba --- /dev/null +++ b/plugins/core/src/cluster.ts @@ -0,0 +1,160 @@ +import { createAsyncQueue } from "@scrypted/common/src/async-queue"; +import sdk, { Readme, ScryptedDeviceBase, ScryptedInterface, ScryptedSettings, Setting, Settings } from "@scrypted/sdk"; + +export const ClusterCoreNativeId = 'clustercore'; + +export class ClusterCore extends ScryptedDeviceBase implements Settings, Readme, ScryptedSettings { + writeQueue = createAsyncQueue<() => Promise>(); + + constructor(nativeId: string) { + super(nativeId); + + (async () => { + for await (const write of this.writeQueue.queue) { + try { + await write(); + } + catch (e) { + this.console.error('error writing settings', e); + } + finally { + this.onDeviceEvent(ScryptedInterface.Settings, undefined); + } + } + })(); + } + + async getSettings(): Promise { + const mode = sdk.clusterManager?.getClusterMode?.(); + if (!mode) + return []; + + const workers = await sdk.clusterManager.getClusterWorkers(); + + const ret: Setting[] = []; + + const clientWorkers = Object.values(workers); + + const clusterFork = await sdk.systemManager.getComponent('cluster-fork'); + + for (const worker of clientWorkers) { + const group = `Worker: ${worker.name}`; + const name: Setting = { + key: `${worker.id}:name`, + group, + title: 'Name', + description: 'The friendly name of the worker.', + value: worker.name, + }; + ret.push(name); + + const mode: Setting = { + key: `${worker.id}:mode`, + group, + title: 'Mode', + description: 'The mode of the worker.', + value: worker.mode, + readonly: true, + }; + ret.push(mode); + + + const envControl = await clusterFork.getEnvControl(worker.id); + // catch in case env is coming from vscode launch.json and no .env actually exists. + const dotEnv: string = await envControl.getDotEnv().catch(() => {}); + const dotEnvLines = dotEnv?.split('\n') || worker.labels; + const dotEnvParsed = dotEnvLines.map(line => { + const trimmed = line.trim(); + if (trimmed.startsWith('#')) { + return { line }; + } + const [key, ...value] = trimmed.split('='); + return { key, value: value.join('='), line }; + }); + + const workerLabels = dotEnvParsed.find(line => line.key === 'SCRYPTED_CLUSTER_LABELS')?.value?.split(',') || []; + + const labelChoices = new Set([ + ...workerLabels, + 'storage', + 'compute', + 'compute.preferred', + '@scrypted/coreml', + '@scrypted/openvino', + '@scrypted/onnx', + '@scrypted/tensorflow-lite', + ]); + const labels: Setting = { + key: `${worker.id}:labels`, + group, + title: 'Labels', + description: 'The labels to apply to this worker. Modifying the labels will restart the worker. Some labels, such as the host OS and architecture, cannot be changed.', + multiple: true, + combobox: true, + choices: [...labelChoices], + value: workerLabels, + }; + ret.push(labels); + } + + return ret; + } + + async putSetting(key: string, value: any) { + await this.writeQueue.enqueue(async () => { + const split = key.split(':'); + const [workerId, setting] = split; + const workers = await sdk.clusterManager.getClusterWorkers(); + const worker = workers[workerId]; + if (!worker) + return; + + + switch (setting) { + case 'name': + case 'labels': + break; + default: + return; + } + + const clusterFork = await sdk.systemManager.getComponent('cluster-fork'); + const envControl = await clusterFork.getEnvControl(worker.id); + const dotEnv: string = await envControl.getDotEnv() || ''; + + const dotEnvLines = dotEnv.split('\n'); + const dotEnvParsed = dotEnvLines.map(line => { + const trimmed = line.trim(); + if (trimmed.startsWith('#')) { + return { line }; + } + const [key, ...value] = trimmed.split('='); + return { key, value: value.join('='), line }; + }); + + const updateDotEnv = async (key: string, newValue: string) => { + let entry = dotEnvParsed.find(line => line.key === key); + if (!entry) { + entry = { key, value: '', line: '' }; + dotEnvParsed.push(entry); + } + entry.line = `${key}=${newValue}`; + await envControl.setDotEnv(dotEnvParsed.filter(line => line).map(line => line.line).join('\n')); + }; + + if (setting === 'labels') { + await updateDotEnv('SCRYPTED_CLUSTER_LABELS', value.join(',')); + } else if (setting === 'name') { + await updateDotEnv('SCRYPTED_CLUSTER_WORKER_NAME', value); + } + setTimeout(async () => { + const serviceControl = await clusterFork.getServiceControl(worker.id); + await serviceControl.restart().catch(() => { }); + }, 10000); + }); + } + + async getReadmeMarkdown(): Promise { + return `Manage Scrypted's cluster mode. Run storage devices and compute services on separate servers.`; + } +} diff --git a/plugins/core/src/main.ts b/plugins/core/src/main.ts index dfb55238a..9bd64d3e2 100644 --- a/plugins/core/src/main.ts +++ b/plugins/core/src/main.ts @@ -109,7 +109,7 @@ class ScryptedCore extends ScryptedDeviceBase implements HttpRequestHandler, Dev { name: 'Cluster', nativeId: ClusterCoreNativeId, - interfaces: [ScryptedInterface.Settings, ScryptedInterface.Readme], + interfaces: [ScryptedInterface.Settings, ScryptedInterface.Readme, ScryptedInterface.ScryptedSettings], type: ScryptedDeviceType.Builtin, }, );