From 347a957cd3abccdff3e5466762a840eceedcf866 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Wed, 20 Nov 2024 10:10:47 -0800 Subject: [PATCH] sdk: add cluster manager --- sdk/package-lock.json | 4 +- sdk/package.json | 2 +- sdk/types/package-lock.json | 4 +- sdk/types/package.json | 2 +- .../scrypted_python/scrypted_sdk/__init__.py | 4 +- .../scrypted_python/scrypted_sdk/types.py | 22 +++++ sdk/types/src/types.input.ts | 39 ++++++++- server/package-lock.json | 8 +- server/package.json | 2 +- server/python/cluster_setup.py | 2 +- server/python/plugin_console.py | 8 ++ server/python/plugin_remote.py | 81 ++++++++++++------- server/python/rpc.py | 9 ++- server/src/scrypted-cluster-main.ts | 2 + server/src/services/cluster-fork.ts | 3 +- 15 files changed, 144 insertions(+), 48 deletions(-) create mode 100644 server/python/plugin_console.py diff --git a/sdk/package-lock.json b/sdk/package-lock.json index f59e94bde..5ca2fd58c 100644 --- a/sdk/package-lock.json +++ b/sdk/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/sdk", - "version": "0.3.74", + "version": "0.3.77", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/sdk", - "version": "0.3.74", + "version": "0.3.77", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.26.0", diff --git a/sdk/package.json b/sdk/package.json index f9cc13e5d..bbb239dbf 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/sdk", - "version": "0.3.74", + "version": "0.3.77", "description": "", "main": "dist/src/index.js", "exports": { diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json index 62e4b245b..ab507e260 100644 --- a/sdk/types/package-lock.json +++ b/sdk/types/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/types", - "version": "0.3.68", + "version": "0.3.71", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/types", - "version": "0.3.68", + "version": "0.3.71", "license": "ISC", "devDependencies": { "@types/node": "^22.1.0", diff --git a/sdk/types/package.json b/sdk/types/package.json index 5079bcf8e..b86654601 100644 --- a/sdk/types/package.json +++ b/sdk/types/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/types", - "version": "0.3.68", + "version": "0.3.71", "description": "", "main": "dist/index.js", "author": "", diff --git a/sdk/types/scrypted_python/scrypted_sdk/__init__.py b/sdk/types/scrypted_python/scrypted_sdk/__init__.py index a49bc556b..c9d639d41 100644 --- a/sdk/types/scrypted_python/scrypted_sdk/__init__.py +++ b/sdk/types/scrypted_python/scrypted_sdk/__init__.py @@ -9,6 +9,8 @@ import asyncio class PluginFork: result: asyncio.Task worker: Process + def terminate(self): + pass deviceManager: DeviceManager = None systemManager: SystemManager = None @@ -18,7 +20,7 @@ remote: Any = None api: Any sdk: ScryptedStatic -def fork() -> PluginFork: +def fork(options: Any) -> PluginFork: pass class ScryptedStatic: diff --git a/sdk/types/scrypted_python/scrypted_sdk/types.py b/sdk/types/scrypted_python/scrypted_sdk/types.py index d32423871..ac660c052 100644 --- a/sdk/types/scrypted_python/scrypted_sdk/types.py +++ b/sdk/types/scrypted_python/scrypted_sdk/types.py @@ -120,6 +120,7 @@ class ScryptedInterface(str, Enum): CO2Sensor = "CO2Sensor" Camera = "Camera" Charger = "Charger" + ClusterForkInterface = "ClusterForkInterface" ColorSettingHsv = "ColorSettingHsv" ColorSettingRgb = "ColorSettingRgb" ColorSettingTemperature = "ColorSettingTemperature" @@ -452,6 +453,12 @@ class AudioVolumes(TypedDict): pass +class ClusterForkInterfaceOptions(TypedDict): + + clusterWorkerId: str # The id of the cluster worker id that will execute this fork. + id: str # The id of the device that is associated with this fork. + nativeId: str # The native id of the mixin that is associated with this fork. + class ColorHsv(TypedDict): """Represents an HSV color value component.""" @@ -997,6 +1004,13 @@ class Charger: chargeState: ChargeState +class ClusterForkInterface: + """Requests that the ScryptedDevice create a fork to""" + + async def forkInterface(self, forkInterface: ObjectDetection, options: ClusterForkInterfaceOptions = None) -> ObjectDetection: + pass + + class ColorSettingHsv: """ColorSettingHsv sets the color of a colored light using the HSV representation.""" @@ -1957,6 +1971,7 @@ class ScryptedInterfaceMethods(str, Enum): eval = "eval" loadScripts = "loadScripts" saveScript = "saveScript" + forkInterface = "forkInterface" trackObjects = "trackObjects" getDetectionInput = "getDetectionInput" getObjectTypes = "getObjectTypes" @@ -3086,6 +3101,13 @@ ScryptedInterfaceDescriptors = { ], "properties": [] }, + "ClusterForkInterface": { + "name": "ClusterForkInterface", + "methods": [ + "forkInterface" + ], + "properties": [] + }, "ObjectTracker": { "name": "ObjectTracker", "methods": [ diff --git a/sdk/types/src/types.input.ts b/sdk/types/src/types.input.ts index 218da649e..cc00bc5ab 100644 --- a/sdk/types/src/types.input.ts +++ b/sdk/types/src/types.input.ts @@ -1690,7 +1690,7 @@ export interface VideoFrameGenerator { /** * Generic bidirectional stream connection. */ -export interface StreamService { +export interface StreamService { connectStream(input?: AsyncGenerator, options?: any): Promise>; } /** @@ -2320,6 +2320,7 @@ export enum ScryptedInterface { PushHandler = "PushHandler", Program = "Program", Scriptable = "Scriptable", + ClusterForkInterface = "ClusterForkInterface", ObjectTracker = "ObjectTracker", ObjectDetector = "ObjectDetector", ObjectDetection = "ObjectDetection", @@ -2534,6 +2535,17 @@ export interface FFmpegTranscode { } export type FFmpegTranscodeStream = (options: FFmpegTranscode) => Promise; +export interface ClusterForkInterfaceOptions extends Required>, Pick { +} + +/** + * Requests that the ScryptedDevice create a fork to + */ +export interface ClusterForkInterface { + forkInterface(forkInterface: ScryptedInterface.ObjectDetection, options?: ClusterForkInterfaceOptions): Promise; + forkInterface(forkInterface: ScryptedInterface, options?: ClusterForkInterfaceOptions): Promise; +} + export interface ForkWorker { terminate(): void; on(event: 'exit', listener: () => void): void; @@ -2658,6 +2670,30 @@ export interface ForkOptions { }; } +export interface ClusterFork extends ForkOptions { + runtime?: ForkOptions['runtime']; + labels?: ForkOptions['labels']; + id?: ForkOptions['id']; + clusterWorkerId: ForkOptions['clusterWorkerId']; +} + +export interface ClusterWorker { + name: string; + id: string; + labels: string[]; + forks: ClusterFork[]; +} + +export interface ClusterManager { + /** + * Returns the id of this cluster worker. + * Returns undefined if this is not a cluster worker. + */ + getClusterWorkerId(): string; + getClusterMode(): 'server' | 'client' | undefined; + getClusterWorkers(): Promise>; +} + export interface ScryptedStatic { /** * @deprecated @@ -2668,6 +2704,7 @@ export interface ScryptedStatic { endpointManager: EndpointManager, mediaManager: MediaManager, systemManager: SystemManager, + clusterManager: ClusterManager; serverVersion?: string; diff --git a/server/package-lock.json b/server/package-lock.json index da67bf2a5..08dbb8c82 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -12,7 +12,7 @@ "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.22", - "@scrypted/types": "^0.3.68", + "@scrypted/types": "^0.3.69", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", @@ -557,9 +557,9 @@ } }, "node_modules/@scrypted/types": { - "version": "0.3.68", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.68.tgz", - "integrity": "sha512-4kxJZXCLTRGgJG8l+7G8R+lENEhfad0rEouX6zcTcA6JtKRhr6OK4lmLOa7h+yY5tbivHizoAvvOSPQWOKxOww==" + "version": "0.3.69", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.69.tgz", + "integrity": "sha512-KFBRM6gZOKOb1AK/bIyLJLvSaLKPX8cfcpay1dgxROdlBKm46nAEiN/9lTiemBpMKyvBOb/o8wxrlCUxNok48g==" }, "node_modules/@types/adm-zip": { "version": "0.5.6", diff --git a/server/package.json b/server/package.json index c05c6858f..9393e5abd 100644 --- a/server/package.json +++ b/server/package.json @@ -5,7 +5,7 @@ "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.22", - "@scrypted/types": "^0.3.68", + "@scrypted/types": "^0.3.69", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", diff --git a/server/python/cluster_setup.py b/server/python/cluster_setup.py index 7deed9db7..7433795fc 100644 --- a/server/python/cluster_setup.py +++ b/server/python/cluster_setup.py @@ -164,7 +164,7 @@ class ClusterSetup(): for key, value in self.peer.params.items(): clusterPeer.params[key] = value clusterPeer.onProxySerialization = ( - lambda value: self.clusterSetup.onProxySerialization( + lambda value: self.onProxySerialization( clusterPeer, value, clusterPeerKey ) ) diff --git a/server/python/plugin_console.py b/server/python/plugin_console.py new file mode 100644 index 000000000..ae2d4d68e --- /dev/null +++ b/server/python/plugin_console.py @@ -0,0 +1,8 @@ +import typing + +async def writeWorkerGenerator(gen, out: typing.TextIO): + try: + async for item in gen: + out.buffer.write(item) + except Exception as e: + pass diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index f1d481923..1c9002179 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -18,7 +18,7 @@ from asyncio.streams import StreamReader, StreamWriter from collections.abc import Mapping from io import StringIO from typing import Any, Callable, Coroutine, Optional, Set, Tuple, TypedDict - +import plugin_console import plugin_volume as pv import rpc import rpc_reader @@ -622,17 +622,17 @@ class PluginRemote: traceback.print_exc() raise - async def loadZipWrapped(self, packageJson, zipAPI: Any, options: dict): - await self.clusterSetup.initializeCluster(options) + async def loadZipWrapped(self, packageJson, zipAPI: Any, zipOptions: dict): + await self.clusterSetup.initializeCluster(zipOptions) sdk = ScryptedStatic() sdk.connectRPCObject = lambda v: self.clusterSetup.connectRPCObject(v) - forkMain = options and options.get("fork") - debug = options.get("debug", None) + forkMain = zipOptions and zipOptions.get("fork") + debug = zipOptions.get("debug", None) plugin_volume = pv.ensure_plugin_volume(self.pluginId) - zipHash = options.get("zipHash") + zipHash = zipOptions.get("zipHash") plugin_zip_paths = pv.prep(plugin_volume, zipHash) if debug: @@ -660,6 +660,13 @@ class PluginRemote: if not forkMain: multiprocessing.set_start_method("spawn") + + # forkMain may be set to true, but the environment may not be initialized + # if the plugin is loaded in another cluster worker. + # instead rely on a environemnt variable that will be passed to + # child processes. + if not os.environ.get("SCRYPTED_PYTHON_INITIALIZED", None): + os.environ["SCRYPTED_PYTHON_INITIALIZED"] = "1" # it's possible to run 32bit docker on aarch64, which cause pip requirements # to fail because pip only allows filtering on machine, even if running a different architeture. @@ -763,8 +770,6 @@ class PluginRemote: self.mediaManager = MediaManager(await self.api.getMediaManager()) try: - from scrypted_sdk import sdk_init2 # type: ignore - sdk.systemManager = self.systemManager sdk.deviceManager = self.deviceManager sdk.mediaManager = self.mediaManager @@ -773,12 +778,32 @@ class PluginRemote: sdk.zip = zip def host_fork(options: dict = None) -> PluginFork: + async def finishFork(forkPeer: rpc.RpcPeer): + getRemote = await forkPeer.getParam("getRemote") + remote: PluginRemote = await getRemote( + self.api, self.pluginId, self.hostInfo + ) + await remote.setSystemState(self.systemManager.getSystemState()) + for nativeId, ds in self.nativeIds.items(): + await remote.setNativeId(nativeId, ds.id, ds.storage) + forkOptions = zipOptions.copy() + forkOptions["fork"] = True + forkOptions["debug"] = debug + + class PluginZipAPI: + + async def getZip(self): + return await zipAPI.getZip() + + return await remote.loadZip(packageJson, PluginZipAPI(), forkOptions) + if cluster_labels.needs_cluster_fork_worker(options): peerLiveness = PeerLiveness(self.loop) - async def startClusterFork(): + async def getClusterFork(): forkComponent = await self.api.getComponent("cluster-fork") sanitizedOptions = options.copy() sanitizedOptions["runtime"] = sanitizedOptions.get("runtime", "python") + sanitizedOptions["zipHash"] = zipHash clusterForkResult = await forkComponent.fork(peerLiveness, sanitizedOptions, packageJson, zipHash, lambda: zipAPI.getZip()) async def waitPeerLiveness(): @@ -799,9 +824,22 @@ class PluginRemote: peerLiveness.killed.set_result(None) asyncio.ensure_future(waitClusterForkResult(), loop=self.loop) - result = asyncio.ensure_future(startClusterFork(), loop=self.loop) + clusterGetRemote = await self.clusterSetup.connectRPCObject(await clusterForkResult.getResult()) + remoteDict = await clusterGetRemote() + asyncio.ensure_future(plugin_console.writeWorkerGenerator(remoteDict["stdout"], sys.stdout)) + asyncio.ensure_future(plugin_console.writeWorkerGenerator(remoteDict["stderr"], sys.stderr)) + + getRemote = remoteDict["getRemote"] + directGetRemote = await self.clusterSetup.connectRPCObject(getRemote) + if directGetRemote is getRemote: + raise Exception("cluster fork peer not direct connected") + + forkPeer = getattr(directGetRemote, rpc.RpcPeer.PROPERTY_PROXY_PEER) + return await finishFork(forkPeer) + + pluginFork = PluginFork() - pluginFork.result = result + pluginFork.result = asyncio.create_task(getClusterFork()) pluginFork.terminate = lambda: peerLiveness.killed.set_result(None) return pluginFork @@ -819,6 +857,7 @@ class PluginRemote: target=plugin_fork, args=(child_conn,), daemon=True ) pluginFork.worker.start() + pluginFork.terminate = lambda: pluginFork.worker.kill() def schedule_exit_check(): def exit_check(): @@ -847,26 +886,11 @@ class PluginRemote: finally: parent_conn.close() rpcTransport.executor.shutdown() - pluginFork.worker.kill() + pluginFork.terminate() asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop) - getRemote = await forkPeer.getParam("getRemote") - remote: PluginRemote = await getRemote( - self.api, self.pluginId, self.hostInfo - ) - await remote.setSystemState(self.systemManager.getSystemState()) - for nativeId, ds in self.nativeIds.items(): - await remote.setNativeId(nativeId, ds.id, ds.storage) - forkOptions = options.copy() - forkOptions["fork"] = True - forkOptions["debug"] = debug - class PluginZipAPI: - - async def getZip(self): - return await zipAPI.getZip() - - return await remote.loadZip(packageJson, PluginZipAPI(), forkOptions) + return await finishFork(forkPeer) pluginFork.result = asyncio.create_task(getFork()) return pluginFork @@ -874,6 +898,7 @@ class PluginRemote: sdk.fork = host_fork # sdk. + from scrypted_sdk import sdk_init2 # type: ignore sdk_init2(sdk) except: from scrypted_sdk import sdk_init # type: ignore diff --git a/server/python/rpc.py b/server/python/rpc.py index 0274847c3..e9f6603e2 100644 --- a/server/python/rpc.py +++ b/server/python/rpc.py @@ -62,7 +62,7 @@ class RpcProxy(object): self.__dict__['__proxy_id'] = entry['id'] self.__dict__['__proxy_entry'] = entry self.__dict__['__proxy_constructor'] = proxyConstructorName - self.__dict__['__proxy_peer'] = peer + self.__dict__[RpcPeer.PROPERTY_PROXY_PEER] = peer self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES] = proxyProps self.__dict__['__proxy_oneway_methods'] = proxyOneWayMethods @@ -105,17 +105,18 @@ class RpcProxy(object): return super().__setattr__(name, value) def __call__(self, *args, **kwargs): - return self.__dict__['__proxy_peer'].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], None, args) + return self.__dict__[RpcPeer.PROPERTY_PROXY_PEER].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], None, args) def __apply__(self, method: str, args: list): - return self.__dict__['__proxy_peer'].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], method, args) + return self.__dict__[RpcPeer.PROPERTY_PROXY_PEER].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], method, args) class RpcPeer: RPC_RESULT_ERROR_NAME = 'RPCResultError' PROPERTY_PROXY_PROPERTIES = '__proxy_props' PROPERTY_JSON_COPY_SERIALIZE_CHILDREN = '__json_copy_serialize_children' + PROPERTY_PROXY_PEER = '__proxy_peer' def __init__(self, send: Callable[[object, Callable[[Exception], None], Dict], None]) -> None: self.send = send @@ -288,7 +289,7 @@ class RpcPeer: return ret __proxy_id = getattr(value, '__proxy_id', None) - __proxy_peer = getattr(value, '__proxy_peer', None) + __proxy_peer = getattr(value, RpcPeer.PROPERTY_PROXY_PEER, None) if __proxy_id and __proxy_peer == self: ret = { '__local_proxy_id': __proxy_id, diff --git a/server/src/scrypted-cluster-main.ts b/server/src/scrypted-cluster-main.ts index b2bb0f2ee..a056dae99 100644 --- a/server/src/scrypted-cluster-main.ts +++ b/server/src/scrypted-cluster-main.ts @@ -207,6 +207,8 @@ export function startClusterClient(mainFilename: string) { unzippedPath, zipHash, }, undefined); + runtimeWorker.stdout.on('data', data => console.log(data.toString())); + runtimeWorker.stderr.on('data', data => console.error(data.toString())); const threadPeer = new RpcPeer('main', 'thread', (message, reject, serializationContext) => runtimeWorker.send(message, reject, serializationContext)); runtimeWorker.setupRpcPeer(threadPeer); diff --git a/server/src/services/cluster-fork.ts b/server/src/services/cluster-fork.ts index 0f5339813..ae39dc3f0 100644 --- a/server/src/services/cluster-fork.ts +++ b/server/src/services/cluster-fork.ts @@ -1,7 +1,6 @@ -import type { ScryptedRuntime } from "../runtime"; import { matchesClusterLabels } from "../cluster/cluster-labels"; +import type { ScryptedRuntime } from "../runtime"; import { ClusterForkOptions, ClusterForkParam, ClusterWorker, PeerLiveness } from "../scrypted-cluster-main"; -import { RpcPeer } from "../rpc"; export class ClusterFork { constructor(public runtime: ScryptedRuntime) { }