diff --git a/sdk/package-lock.json b/sdk/package-lock.json
index f59e94bde..5ca2fd58c 100644
--- a/sdk/package-lock.json
+++ b/sdk/package-lock.json
@@ -1,12 +1,12 @@
{
"name": "@scrypted/sdk",
- "version": "0.3.74",
+ "version": "0.3.77",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/sdk",
- "version": "0.3.74",
+ "version": "0.3.77",
"license": "ISC",
"dependencies": {
"@babel/preset-typescript": "^7.26.0",
diff --git a/sdk/package.json b/sdk/package.json
index f9cc13e5d..bbb239dbf 100644
--- a/sdk/package.json
+++ b/sdk/package.json
@@ -1,6 +1,6 @@
{
"name": "@scrypted/sdk",
- "version": "0.3.74",
+ "version": "0.3.77",
"description": "",
"main": "dist/src/index.js",
"exports": {
diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json
index 62e4b245b..ab507e260 100644
--- a/sdk/types/package-lock.json
+++ b/sdk/types/package-lock.json
@@ -1,12 +1,12 @@
{
"name": "@scrypted/types",
- "version": "0.3.68",
+ "version": "0.3.71",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/types",
- "version": "0.3.68",
+ "version": "0.3.71",
"license": "ISC",
"devDependencies": {
"@types/node": "^22.1.0",
diff --git a/sdk/types/package.json b/sdk/types/package.json
index 5079bcf8e..b86654601 100644
--- a/sdk/types/package.json
+++ b/sdk/types/package.json
@@ -1,6 +1,6 @@
{
"name": "@scrypted/types",
- "version": "0.3.68",
+ "version": "0.3.71",
"description": "",
"main": "dist/index.js",
"author": "",
diff --git a/sdk/types/scrypted_python/scrypted_sdk/__init__.py b/sdk/types/scrypted_python/scrypted_sdk/__init__.py
index a49bc556b..c9d639d41 100644
--- a/sdk/types/scrypted_python/scrypted_sdk/__init__.py
+++ b/sdk/types/scrypted_python/scrypted_sdk/__init__.py
@@ -9,6 +9,8 @@ import asyncio
class PluginFork:
result: asyncio.Task
worker: Process
+ def terminate(self):
+ pass
deviceManager: DeviceManager = None
systemManager: SystemManager = None
@@ -18,7 +20,7 @@ remote: Any = None
api: Any
sdk: ScryptedStatic
-def fork() -> PluginFork:
+def fork(options: Any) -> PluginFork:
pass
class ScryptedStatic:
diff --git a/sdk/types/scrypted_python/scrypted_sdk/types.py b/sdk/types/scrypted_python/scrypted_sdk/types.py
index d32423871..ac660c052 100644
--- a/sdk/types/scrypted_python/scrypted_sdk/types.py
+++ b/sdk/types/scrypted_python/scrypted_sdk/types.py
@@ -120,6 +120,7 @@ class ScryptedInterface(str, Enum):
CO2Sensor = "CO2Sensor"
Camera = "Camera"
Charger = "Charger"
+ ClusterForkInterface = "ClusterForkInterface"
ColorSettingHsv = "ColorSettingHsv"
ColorSettingRgb = "ColorSettingRgb"
ColorSettingTemperature = "ColorSettingTemperature"
@@ -452,6 +453,12 @@ class AudioVolumes(TypedDict):
pass
+class ClusterForkInterfaceOptions(TypedDict):
+
+ clusterWorkerId: str # The id of the cluster worker id that will execute this fork.
+ id: str # The id of the device that is associated with this fork.
+ nativeId: str # The native id of the mixin that is associated with this fork.
+
class ColorHsv(TypedDict):
"""Represents an HSV color value component."""
@@ -997,6 +1004,13 @@ class Charger:
chargeState: ChargeState
+class ClusterForkInterface:
+ """Requests that the ScryptedDevice create a fork to"""
+
+ async def forkInterface(self, forkInterface: ObjectDetection, options: ClusterForkInterfaceOptions = None) -> ObjectDetection:
+ pass
+
+
class ColorSettingHsv:
"""ColorSettingHsv sets the color of a colored light using the HSV representation."""
@@ -1957,6 +1971,7 @@ class ScryptedInterfaceMethods(str, Enum):
eval = "eval"
loadScripts = "loadScripts"
saveScript = "saveScript"
+ forkInterface = "forkInterface"
trackObjects = "trackObjects"
getDetectionInput = "getDetectionInput"
getObjectTypes = "getObjectTypes"
@@ -3086,6 +3101,13 @@ ScryptedInterfaceDescriptors = {
],
"properties": []
},
+ "ClusterForkInterface": {
+ "name": "ClusterForkInterface",
+ "methods": [
+ "forkInterface"
+ ],
+ "properties": []
+ },
"ObjectTracker": {
"name": "ObjectTracker",
"methods": [
diff --git a/sdk/types/src/types.input.ts b/sdk/types/src/types.input.ts
index 218da649e..cc00bc5ab 100644
--- a/sdk/types/src/types.input.ts
+++ b/sdk/types/src/types.input.ts
@@ -1690,7 +1690,7 @@ export interface VideoFrameGenerator {
/**
* Generic bidirectional stream connection.
*/
-export interface StreamService {
+export interface StreamService {
connectStream(input?: AsyncGenerator, options?: any): Promise>;
}
/**
@@ -2320,6 +2320,7 @@ export enum ScryptedInterface {
PushHandler = "PushHandler",
Program = "Program",
Scriptable = "Scriptable",
+ ClusterForkInterface = "ClusterForkInterface",
ObjectTracker = "ObjectTracker",
ObjectDetector = "ObjectDetector",
ObjectDetection = "ObjectDetection",
@@ -2534,6 +2535,17 @@ export interface FFmpegTranscode {
}
export type FFmpegTranscodeStream = (options: FFmpegTranscode) => Promise;
+export interface ClusterForkInterfaceOptions extends Required>, Pick {
+}
+
+/**
+ * Requests that the ScryptedDevice create a fork to
+ */
+export interface ClusterForkInterface {
+ forkInterface(forkInterface: ScryptedInterface.ObjectDetection, options?: ClusterForkInterfaceOptions): Promise;
+ forkInterface(forkInterface: ScryptedInterface, options?: ClusterForkInterfaceOptions): Promise;
+}
+
export interface ForkWorker {
terminate(): void;
on(event: 'exit', listener: () => void): void;
@@ -2658,6 +2670,30 @@ export interface ForkOptions {
};
}
+export interface ClusterFork extends ForkOptions {
+ runtime?: ForkOptions['runtime'];
+ labels?: ForkOptions['labels'];
+ id?: ForkOptions['id'];
+ clusterWorkerId: ForkOptions['clusterWorkerId'];
+}
+
+export interface ClusterWorker {
+ name: string;
+ id: string;
+ labels: string[];
+ forks: ClusterFork[];
+}
+
+export interface ClusterManager {
+ /**
+ * Returns the id of this cluster worker.
+ * Returns undefined if this is not a cluster worker.
+ */
+ getClusterWorkerId(): string;
+ getClusterMode(): 'server' | 'client' | undefined;
+ getClusterWorkers(): Promise>;
+}
+
export interface ScryptedStatic {
/**
* @deprecated
@@ -2668,6 +2704,7 @@ export interface ScryptedStatic {
endpointManager: EndpointManager,
mediaManager: MediaManager,
systemManager: SystemManager,
+ clusterManager: ClusterManager;
serverVersion?: string;
diff --git a/server/package-lock.json b/server/package-lock.json
index da67bf2a5..08dbb8c82 100644
--- a/server/package-lock.json
+++ b/server/package-lock.json
@@ -12,7 +12,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.22",
- "@scrypted/types": "^0.3.68",
+ "@scrypted/types": "^0.3.69",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",
@@ -557,9 +557,9 @@
}
},
"node_modules/@scrypted/types": {
- "version": "0.3.68",
- "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.68.tgz",
- "integrity": "sha512-4kxJZXCLTRGgJG8l+7G8R+lENEhfad0rEouX6zcTcA6JtKRhr6OK4lmLOa7h+yY5tbivHizoAvvOSPQWOKxOww=="
+ "version": "0.3.69",
+ "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.69.tgz",
+ "integrity": "sha512-KFBRM6gZOKOb1AK/bIyLJLvSaLKPX8cfcpay1dgxROdlBKm46nAEiN/9lTiemBpMKyvBOb/o8wxrlCUxNok48g=="
},
"node_modules/@types/adm-zip": {
"version": "0.5.6",
diff --git a/server/package.json b/server/package.json
index c05c6858f..9393e5abd 100644
--- a/server/package.json
+++ b/server/package.json
@@ -5,7 +5,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.22",
- "@scrypted/types": "^0.3.68",
+ "@scrypted/types": "^0.3.69",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",
diff --git a/server/python/cluster_setup.py b/server/python/cluster_setup.py
index 7deed9db7..7433795fc 100644
--- a/server/python/cluster_setup.py
+++ b/server/python/cluster_setup.py
@@ -164,7 +164,7 @@ class ClusterSetup():
for key, value in self.peer.params.items():
clusterPeer.params[key] = value
clusterPeer.onProxySerialization = (
- lambda value: self.clusterSetup.onProxySerialization(
+ lambda value: self.onProxySerialization(
clusterPeer, value, clusterPeerKey
)
)
diff --git a/server/python/plugin_console.py b/server/python/plugin_console.py
new file mode 100644
index 000000000..ae2d4d68e
--- /dev/null
+++ b/server/python/plugin_console.py
@@ -0,0 +1,8 @@
+import typing
+
+async def writeWorkerGenerator(gen, out: typing.TextIO):
+ try:
+ async for item in gen:
+ out.buffer.write(item)
+ except Exception as e:
+ pass
diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py
index f1d481923..1c9002179 100644
--- a/server/python/plugin_remote.py
+++ b/server/python/plugin_remote.py
@@ -18,7 +18,7 @@ from asyncio.streams import StreamReader, StreamWriter
from collections.abc import Mapping
from io import StringIO
from typing import Any, Callable, Coroutine, Optional, Set, Tuple, TypedDict
-
+import plugin_console
import plugin_volume as pv
import rpc
import rpc_reader
@@ -622,17 +622,17 @@ class PluginRemote:
traceback.print_exc()
raise
- async def loadZipWrapped(self, packageJson, zipAPI: Any, options: dict):
- await self.clusterSetup.initializeCluster(options)
+ async def loadZipWrapped(self, packageJson, zipAPI: Any, zipOptions: dict):
+ await self.clusterSetup.initializeCluster(zipOptions)
sdk = ScryptedStatic()
sdk.connectRPCObject = lambda v: self.clusterSetup.connectRPCObject(v)
- forkMain = options and options.get("fork")
- debug = options.get("debug", None)
+ forkMain = zipOptions and zipOptions.get("fork")
+ debug = zipOptions.get("debug", None)
plugin_volume = pv.ensure_plugin_volume(self.pluginId)
- zipHash = options.get("zipHash")
+ zipHash = zipOptions.get("zipHash")
plugin_zip_paths = pv.prep(plugin_volume, zipHash)
if debug:
@@ -660,6 +660,13 @@ class PluginRemote:
if not forkMain:
multiprocessing.set_start_method("spawn")
+
+ # forkMain may be set to true, but the environment may not be initialized
+ # if the plugin is loaded in another cluster worker.
+ # instead rely on a environemnt variable that will be passed to
+ # child processes.
+ if not os.environ.get("SCRYPTED_PYTHON_INITIALIZED", None):
+ os.environ["SCRYPTED_PYTHON_INITIALIZED"] = "1"
# it's possible to run 32bit docker on aarch64, which cause pip requirements
# to fail because pip only allows filtering on machine, even if running a different architeture.
@@ -763,8 +770,6 @@ class PluginRemote:
self.mediaManager = MediaManager(await self.api.getMediaManager())
try:
- from scrypted_sdk import sdk_init2 # type: ignore
-
sdk.systemManager = self.systemManager
sdk.deviceManager = self.deviceManager
sdk.mediaManager = self.mediaManager
@@ -773,12 +778,32 @@ class PluginRemote:
sdk.zip = zip
def host_fork(options: dict = None) -> PluginFork:
+ async def finishFork(forkPeer: rpc.RpcPeer):
+ getRemote = await forkPeer.getParam("getRemote")
+ remote: PluginRemote = await getRemote(
+ self.api, self.pluginId, self.hostInfo
+ )
+ await remote.setSystemState(self.systemManager.getSystemState())
+ for nativeId, ds in self.nativeIds.items():
+ await remote.setNativeId(nativeId, ds.id, ds.storage)
+ forkOptions = zipOptions.copy()
+ forkOptions["fork"] = True
+ forkOptions["debug"] = debug
+
+ class PluginZipAPI:
+
+ async def getZip(self):
+ return await zipAPI.getZip()
+
+ return await remote.loadZip(packageJson, PluginZipAPI(), forkOptions)
+
if cluster_labels.needs_cluster_fork_worker(options):
peerLiveness = PeerLiveness(self.loop)
- async def startClusterFork():
+ async def getClusterFork():
forkComponent = await self.api.getComponent("cluster-fork")
sanitizedOptions = options.copy()
sanitizedOptions["runtime"] = sanitizedOptions.get("runtime", "python")
+ sanitizedOptions["zipHash"] = zipHash
clusterForkResult = await forkComponent.fork(peerLiveness, sanitizedOptions, packageJson, zipHash, lambda: zipAPI.getZip())
async def waitPeerLiveness():
@@ -799,9 +824,22 @@ class PluginRemote:
peerLiveness.killed.set_result(None)
asyncio.ensure_future(waitClusterForkResult(), loop=self.loop)
- result = asyncio.ensure_future(startClusterFork(), loop=self.loop)
+ clusterGetRemote = await self.clusterSetup.connectRPCObject(await clusterForkResult.getResult())
+ remoteDict = await clusterGetRemote()
+ asyncio.ensure_future(plugin_console.writeWorkerGenerator(remoteDict["stdout"], sys.stdout))
+ asyncio.ensure_future(plugin_console.writeWorkerGenerator(remoteDict["stderr"], sys.stderr))
+
+ getRemote = remoteDict["getRemote"]
+ directGetRemote = await self.clusterSetup.connectRPCObject(getRemote)
+ if directGetRemote is getRemote:
+ raise Exception("cluster fork peer not direct connected")
+
+ forkPeer = getattr(directGetRemote, rpc.RpcPeer.PROPERTY_PROXY_PEER)
+ return await finishFork(forkPeer)
+
+
pluginFork = PluginFork()
- pluginFork.result = result
+ pluginFork.result = asyncio.create_task(getClusterFork())
pluginFork.terminate = lambda: peerLiveness.killed.set_result(None)
return pluginFork
@@ -819,6 +857,7 @@ class PluginRemote:
target=plugin_fork, args=(child_conn,), daemon=True
)
pluginFork.worker.start()
+ pluginFork.terminate = lambda: pluginFork.worker.kill()
def schedule_exit_check():
def exit_check():
@@ -847,26 +886,11 @@ class PluginRemote:
finally:
parent_conn.close()
rpcTransport.executor.shutdown()
- pluginFork.worker.kill()
+ pluginFork.terminate()
asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop)
- getRemote = await forkPeer.getParam("getRemote")
- remote: PluginRemote = await getRemote(
- self.api, self.pluginId, self.hostInfo
- )
- await remote.setSystemState(self.systemManager.getSystemState())
- for nativeId, ds in self.nativeIds.items():
- await remote.setNativeId(nativeId, ds.id, ds.storage)
- forkOptions = options.copy()
- forkOptions["fork"] = True
- forkOptions["debug"] = debug
- class PluginZipAPI:
-
- async def getZip(self):
- return await zipAPI.getZip()
-
- return await remote.loadZip(packageJson, PluginZipAPI(), forkOptions)
+ return await finishFork(forkPeer)
pluginFork.result = asyncio.create_task(getFork())
return pluginFork
@@ -874,6 +898,7 @@ class PluginRemote:
sdk.fork = host_fork
# sdk.
+ from scrypted_sdk import sdk_init2 # type: ignore
sdk_init2(sdk)
except:
from scrypted_sdk import sdk_init # type: ignore
diff --git a/server/python/rpc.py b/server/python/rpc.py
index 0274847c3..e9f6603e2 100644
--- a/server/python/rpc.py
+++ b/server/python/rpc.py
@@ -62,7 +62,7 @@ class RpcProxy(object):
self.__dict__['__proxy_id'] = entry['id']
self.__dict__['__proxy_entry'] = entry
self.__dict__['__proxy_constructor'] = proxyConstructorName
- self.__dict__['__proxy_peer'] = peer
+ self.__dict__[RpcPeer.PROPERTY_PROXY_PEER] = peer
self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES] = proxyProps
self.__dict__['__proxy_oneway_methods'] = proxyOneWayMethods
@@ -105,17 +105,18 @@ class RpcProxy(object):
return super().__setattr__(name, value)
def __call__(self, *args, **kwargs):
- return self.__dict__['__proxy_peer'].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], None, args)
+ return self.__dict__[RpcPeer.PROPERTY_PROXY_PEER].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], None, args)
def __apply__(self, method: str, args: list):
- return self.__dict__['__proxy_peer'].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], method, args)
+ return self.__dict__[RpcPeer.PROPERTY_PROXY_PEER].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], method, args)
class RpcPeer:
RPC_RESULT_ERROR_NAME = 'RPCResultError'
PROPERTY_PROXY_PROPERTIES = '__proxy_props'
PROPERTY_JSON_COPY_SERIALIZE_CHILDREN = '__json_copy_serialize_children'
+ PROPERTY_PROXY_PEER = '__proxy_peer'
def __init__(self, send: Callable[[object, Callable[[Exception], None], Dict], None]) -> None:
self.send = send
@@ -288,7 +289,7 @@ class RpcPeer:
return ret
__proxy_id = getattr(value, '__proxy_id', None)
- __proxy_peer = getattr(value, '__proxy_peer', None)
+ __proxy_peer = getattr(value, RpcPeer.PROPERTY_PROXY_PEER, None)
if __proxy_id and __proxy_peer == self:
ret = {
'__local_proxy_id': __proxy_id,
diff --git a/server/src/scrypted-cluster-main.ts b/server/src/scrypted-cluster-main.ts
index b2bb0f2ee..a056dae99 100644
--- a/server/src/scrypted-cluster-main.ts
+++ b/server/src/scrypted-cluster-main.ts
@@ -207,6 +207,8 @@ export function startClusterClient(mainFilename: string) {
unzippedPath,
zipHash,
}, undefined);
+ runtimeWorker.stdout.on('data', data => console.log(data.toString()));
+ runtimeWorker.stderr.on('data', data => console.error(data.toString()));
const threadPeer = new RpcPeer('main', 'thread', (message, reject, serializationContext) => runtimeWorker.send(message, reject, serializationContext));
runtimeWorker.setupRpcPeer(threadPeer);
diff --git a/server/src/services/cluster-fork.ts b/server/src/services/cluster-fork.ts
index 0f5339813..ae39dc3f0 100644
--- a/server/src/services/cluster-fork.ts
+++ b/server/src/services/cluster-fork.ts
@@ -1,7 +1,6 @@
-import type { ScryptedRuntime } from "../runtime";
import { matchesClusterLabels } from "../cluster/cluster-labels";
+import type { ScryptedRuntime } from "../runtime";
import { ClusterForkOptions, ClusterForkParam, ClusterWorker, PeerLiveness } from "../scrypted-cluster-main";
-import { RpcPeer } from "../rpc";
export class ClusterFork {
constructor(public runtime: ScryptedRuntime) { }