sdk: add cluster manager

This commit is contained in:
Koushik Dutta
2024-11-20 10:10:47 -08:00
parent 459b95a0e2
commit 347a957cd3
15 changed files with 144 additions and 48 deletions

View File

@@ -164,7 +164,7 @@ class ClusterSetup():
for key, value in self.peer.params.items():
clusterPeer.params[key] = value
clusterPeer.onProxySerialization = (
lambda value: self.clusterSetup.onProxySerialization(
lambda value: self.onProxySerialization(
clusterPeer, value, clusterPeerKey
)
)

View File

@@ -0,0 +1,8 @@
import typing
async def writeWorkerGenerator(gen, out: typing.TextIO):
try:
async for item in gen:
out.buffer.write(item)
except Exception as e:
pass

View File

@@ -18,7 +18,7 @@ from asyncio.streams import StreamReader, StreamWriter
from collections.abc import Mapping
from io import StringIO
from typing import Any, Callable, Coroutine, Optional, Set, Tuple, TypedDict
import plugin_console
import plugin_volume as pv
import rpc
import rpc_reader
@@ -622,17 +622,17 @@ class PluginRemote:
traceback.print_exc()
raise
async def loadZipWrapped(self, packageJson, zipAPI: Any, options: dict):
await self.clusterSetup.initializeCluster(options)
async def loadZipWrapped(self, packageJson, zipAPI: Any, zipOptions: dict):
await self.clusterSetup.initializeCluster(zipOptions)
sdk = ScryptedStatic()
sdk.connectRPCObject = lambda v: self.clusterSetup.connectRPCObject(v)
forkMain = options and options.get("fork")
debug = options.get("debug", None)
forkMain = zipOptions and zipOptions.get("fork")
debug = zipOptions.get("debug", None)
plugin_volume = pv.ensure_plugin_volume(self.pluginId)
zipHash = options.get("zipHash")
zipHash = zipOptions.get("zipHash")
plugin_zip_paths = pv.prep(plugin_volume, zipHash)
if debug:
@@ -660,6 +660,13 @@ class PluginRemote:
if not forkMain:
multiprocessing.set_start_method("spawn")
# forkMain may be set to true, but the environment may not be initialized
# if the plugin is loaded in another cluster worker.
# instead rely on a environemnt variable that will be passed to
# child processes.
if not os.environ.get("SCRYPTED_PYTHON_INITIALIZED", None):
os.environ["SCRYPTED_PYTHON_INITIALIZED"] = "1"
# it's possible to run 32bit docker on aarch64, which cause pip requirements
# to fail because pip only allows filtering on machine, even if running a different architeture.
@@ -763,8 +770,6 @@ class PluginRemote:
self.mediaManager = MediaManager(await self.api.getMediaManager())
try:
from scrypted_sdk import sdk_init2 # type: ignore
sdk.systemManager = self.systemManager
sdk.deviceManager = self.deviceManager
sdk.mediaManager = self.mediaManager
@@ -773,12 +778,32 @@ class PluginRemote:
sdk.zip = zip
def host_fork(options: dict = None) -> PluginFork:
async def finishFork(forkPeer: rpc.RpcPeer):
getRemote = await forkPeer.getParam("getRemote")
remote: PluginRemote = await getRemote(
self.api, self.pluginId, self.hostInfo
)
await remote.setSystemState(self.systemManager.getSystemState())
for nativeId, ds in self.nativeIds.items():
await remote.setNativeId(nativeId, ds.id, ds.storage)
forkOptions = zipOptions.copy()
forkOptions["fork"] = True
forkOptions["debug"] = debug
class PluginZipAPI:
async def getZip(self):
return await zipAPI.getZip()
return await remote.loadZip(packageJson, PluginZipAPI(), forkOptions)
if cluster_labels.needs_cluster_fork_worker(options):
peerLiveness = PeerLiveness(self.loop)
async def startClusterFork():
async def getClusterFork():
forkComponent = await self.api.getComponent("cluster-fork")
sanitizedOptions = options.copy()
sanitizedOptions["runtime"] = sanitizedOptions.get("runtime", "python")
sanitizedOptions["zipHash"] = zipHash
clusterForkResult = await forkComponent.fork(peerLiveness, sanitizedOptions, packageJson, zipHash, lambda: zipAPI.getZip())
async def waitPeerLiveness():
@@ -799,9 +824,22 @@ class PluginRemote:
peerLiveness.killed.set_result(None)
asyncio.ensure_future(waitClusterForkResult(), loop=self.loop)
result = asyncio.ensure_future(startClusterFork(), loop=self.loop)
clusterGetRemote = await self.clusterSetup.connectRPCObject(await clusterForkResult.getResult())
remoteDict = await clusterGetRemote()
asyncio.ensure_future(plugin_console.writeWorkerGenerator(remoteDict["stdout"], sys.stdout))
asyncio.ensure_future(plugin_console.writeWorkerGenerator(remoteDict["stderr"], sys.stderr))
getRemote = remoteDict["getRemote"]
directGetRemote = await self.clusterSetup.connectRPCObject(getRemote)
if directGetRemote is getRemote:
raise Exception("cluster fork peer not direct connected")
forkPeer = getattr(directGetRemote, rpc.RpcPeer.PROPERTY_PROXY_PEER)
return await finishFork(forkPeer)
pluginFork = PluginFork()
pluginFork.result = result
pluginFork.result = asyncio.create_task(getClusterFork())
pluginFork.terminate = lambda: peerLiveness.killed.set_result(None)
return pluginFork
@@ -819,6 +857,7 @@ class PluginRemote:
target=plugin_fork, args=(child_conn,), daemon=True
)
pluginFork.worker.start()
pluginFork.terminate = lambda: pluginFork.worker.kill()
def schedule_exit_check():
def exit_check():
@@ -847,26 +886,11 @@ class PluginRemote:
finally:
parent_conn.close()
rpcTransport.executor.shutdown()
pluginFork.worker.kill()
pluginFork.terminate()
asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop)
getRemote = await forkPeer.getParam("getRemote")
remote: PluginRemote = await getRemote(
self.api, self.pluginId, self.hostInfo
)
await remote.setSystemState(self.systemManager.getSystemState())
for nativeId, ds in self.nativeIds.items():
await remote.setNativeId(nativeId, ds.id, ds.storage)
forkOptions = options.copy()
forkOptions["fork"] = True
forkOptions["debug"] = debug
class PluginZipAPI:
async def getZip(self):
return await zipAPI.getZip()
return await remote.loadZip(packageJson, PluginZipAPI(), forkOptions)
return await finishFork(forkPeer)
pluginFork.result = asyncio.create_task(getFork())
return pluginFork
@@ -874,6 +898,7 @@ class PluginRemote:
sdk.fork = host_fork
# sdk.
from scrypted_sdk import sdk_init2 # type: ignore
sdk_init2(sdk)
except:
from scrypted_sdk import sdk_init # type: ignore

View File

@@ -62,7 +62,7 @@ class RpcProxy(object):
self.__dict__['__proxy_id'] = entry['id']
self.__dict__['__proxy_entry'] = entry
self.__dict__['__proxy_constructor'] = proxyConstructorName
self.__dict__['__proxy_peer'] = peer
self.__dict__[RpcPeer.PROPERTY_PROXY_PEER] = peer
self.__dict__[RpcPeer.PROPERTY_PROXY_PROPERTIES] = proxyProps
self.__dict__['__proxy_oneway_methods'] = proxyOneWayMethods
@@ -105,17 +105,18 @@ class RpcProxy(object):
return super().__setattr__(name, value)
def __call__(self, *args, **kwargs):
return self.__dict__['__proxy_peer'].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], None, args)
return self.__dict__[RpcPeer.PROPERTY_PROXY_PEER].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], None, args)
def __apply__(self, method: str, args: list):
return self.__dict__['__proxy_peer'].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], method, args)
return self.__dict__[RpcPeer.PROPERTY_PROXY_PEER].__apply__(self.__dict__['__proxy_id'], self.__dict__['__proxy_oneway_methods'], method, args)
class RpcPeer:
RPC_RESULT_ERROR_NAME = 'RPCResultError'
PROPERTY_PROXY_PROPERTIES = '__proxy_props'
PROPERTY_JSON_COPY_SERIALIZE_CHILDREN = '__json_copy_serialize_children'
PROPERTY_PROXY_PEER = '__proxy_peer'
def __init__(self, send: Callable[[object, Callable[[Exception], None], Dict], None]) -> None:
self.send = send
@@ -288,7 +289,7 @@ class RpcPeer:
return ret
__proxy_id = getattr(value, '__proxy_id', None)
__proxy_peer = getattr(value, '__proxy_peer', None)
__proxy_peer = getattr(value, RpcPeer.PROPERTY_PROXY_PEER, None)
if __proxy_id and __proxy_peer == self:
ret = {
'__local_proxy_id': __proxy_id,