From 4c04e9e40399dc85e317e129446c3fcbb953312f Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Wed, 31 Jul 2024 22:51:56 -0700 Subject: [PATCH] server: implement multi server clustering --- server/python/plugin_remote.py | 612 ++++++++++++++-------- server/src/cluster/cluster-hash.ts | 2 +- server/src/cluster/connect-rpc-object.ts | 3 +- server/src/plugin/plugin-remote-worker.ts | 101 ++-- 4 files changed, 465 insertions(+), 253 deletions(-) diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index cb6015966..d1da3acf7 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -8,7 +8,6 @@ import multiprocessing import multiprocessing.connection import os import platform -import shutil import sys import threading import time @@ -39,11 +38,13 @@ ptpython wheel """.strip() + class ClusterObject(TypedDict): id: str + address: str port: int proxyId: str - sourcePort: int + sourceKey: str sha256: str @@ -60,7 +61,7 @@ class DeviceProxy(object): self.device: asyncio.Future[rpc.RpcProxy] = None def __getattr__(self, name): - if name == 'id': + if name == "id": return self.id if hasattr(ScryptedInterfaceProperty, name): @@ -70,28 +71,33 @@ class DeviceProxy(object): p = state.get(name) if not p: return - return p.get('value', None) + return p.get("value", None) if hasattr(ScryptedInterfaceMethods, name): return rpc.RpcProxyMethod(self, name) def __setattr__(self, name: str, value: Any) -> None: - if name == '__proxy_finalizer_id': - self.__dict__['__proxy_entry']['finalizerId'] = value + if name == "__proxy_finalizer_id": + self.__dict__["__proxy_entry"]["finalizerId"] = value return super().__setattr__(name, value) def __apply__(self, method: str, args: list): if not self.device: - self.device = asyncio.ensure_future(self.systemManager.api.getDeviceById(self.id)) + self.device = asyncio.ensure_future( + self.systemManager.api.getDeviceById(self.id) + ) async def apply(): device = await self.device return await device.__apply__(method, args) + return apply() class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): - def __init__(self, api: Any, systemState: Mapping[str, Mapping[str, SystemDeviceState]]) -> None: + def __init__( + self, api: Any, systemState: Mapping[str, Mapping[str, SystemDeviceState]] + ) -> None: super().__init__() self.api = api self.systemState = systemState @@ -103,7 +109,9 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): def getSystemState(self) -> Any: return self.systemState - def getDeviceById(self, idOrPluginId: str, nativeId: str = None) -> scrypted_python.scrypted_sdk.ScryptedDevice: + def getDeviceById( + self, idOrPluginId: str, nativeId: str = None + ) -> scrypted_python.scrypted_sdk.ScryptedDevice: id: str = None if self.systemState.get(idOrPluginId, None): if nativeId is not None: @@ -114,15 +122,15 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): state = self.systemState.get(check, None) if not state: continue - pluginId = state.get('pluginId', None) + pluginId = state.get("pluginId", None) if not pluginId: continue - pluginId = pluginId.get('value', None) + pluginId = pluginId.get("value", None) if pluginId == idOrPluginId: - checkNativeId = state.get('nativeId', None) + checkNativeId = state.get("nativeId", None) if not checkNativeId: continue - checkNativeId = checkNativeId.get('value', None) + checkNativeId = checkNativeId.get("value", None) if nativeId == checkNativeId: id = idOrPluginId break @@ -140,31 +148,38 @@ class SystemManager(scrypted_python.scrypted_sdk.types.SystemManager): state = self.systemState.get(check, None) if not state: continue - checkInterfaces = state.get('interfaces', None) + checkInterfaces = state.get("interfaces", None) if not checkInterfaces: continue - interfaces = checkInterfaces.get('value', []) + interfaces = checkInterfaces.get("value", []) if ScryptedInterface.ScryptedPlugin.value in interfaces: - checkPluginId = state.get('pluginId', None) + checkPluginId = state.get("pluginId", None) if not checkPluginId: continue - pluginId = checkPluginId.get('value', None) + pluginId = checkPluginId.get("value", None) if not pluginId: continue if pluginId == name: return self.getDeviceById(check) - checkName = state.get('name', None) + checkName = state.get("name", None) if not checkName: continue - if checkName.get('value', None) == name: + if checkName.get("value", None) == name: return self.getDeviceById(check) # TODO - async def listen(self, callback: scrypted_python.scrypted_sdk.EventListener) -> scrypted_python.scrypted_sdk.EventListenerRegister: + async def listen( + self, callback: scrypted_python.scrypted_sdk.EventListener + ) -> scrypted_python.scrypted_sdk.EventListenerRegister: return super().listen(callback) # TODO - async def listenDevice(self, id: str, event: str | scrypted_python.scrypted_sdk.EventListenerOptions, callback: scrypted_python.scrypted_sdk.EventListener) -> scrypted_python.scrypted_sdk.EventListenerRegister: + async def listenDevice( + self, + id: str, + event: str | scrypted_python.scrypted_sdk.EventListenerOptions, + callback: scrypted_python.scrypted_sdk.EventListener, + ) -> scrypted_python.scrypted_sdk.EventListenerRegister: return super().listenDevice(id, event, callback) async def removeDevice(self, id: str) -> None: @@ -179,7 +194,7 @@ class MediaObject(scrypted_python.scrypted_sdk.types.MediaObject): setattr(self, rpc.RpcPeer.PROPERTY_PROXY_PROPERTIES, proxyProps) options = options or {} - options['mimeType'] = mimeType + options["mimeType"] = mimeType for key, value in options.items(): if rpc.RpcPeer.isTransportSafe(value): @@ -194,38 +209,83 @@ class MediaManager: def __init__(self, mediaManager: scrypted_python.scrypted_sdk.types.MediaManager): self.mediaManager = mediaManager - async def addConverter(self, converter: scrypted_python.scrypted_sdk.types.BufferConverter) -> None: + async def addConverter( + self, converter: scrypted_python.scrypted_sdk.types.BufferConverter + ) -> None: return await self.mediaManager.addConverter(converter) async def clearConverters(self) -> None: return await self.mediaManager.clearConverters() - async def convertMediaObject(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> Any: + async def convertMediaObject( + self, + mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, + toMimeType: str, + ) -> Any: return await self.mediaManager.convertMediaObject(mediaObject, toMimeType) - async def convertMediaObjectToBuffer(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> bytearray: - return await self.mediaManager.convertMediaObjectToBuffer(mediaObject, toMimeType) + async def convertMediaObjectToBuffer( + self, + mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, + toMimeType: str, + ) -> bytearray: + return await self.mediaManager.convertMediaObjectToBuffer( + mediaObject, toMimeType + ) - async def convertMediaObjectToInsecureLocalUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str: - return await self.mediaManager.convertMediaObjectToInsecureLocalUrl(mediaObject, toMimeType) + async def convertMediaObjectToInsecureLocalUrl( + self, + mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, + toMimeType: str, + ) -> str: + return await self.mediaManager.convertMediaObjectToInsecureLocalUrl( + mediaObject, toMimeType + ) - async def convertMediaObjectToJSON(self, mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> Any: + async def convertMediaObjectToJSON( + self, + mediaObject: scrypted_python.scrypted_sdk.types.MediaObject, + toMimeType: str, + ) -> Any: return await self.mediaManager.convertMediaObjectToJSON(mediaObject, toMimeType) - async def convertMediaObjectToLocalUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str: - return await self.mediaManager.convertMediaObjectToLocalUrl(mediaObject, toMimeType) + async def convertMediaObjectToLocalUrl( + self, + mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, + toMimeType: str, + ) -> str: + return await self.mediaManager.convertMediaObjectToLocalUrl( + mediaObject, toMimeType + ) - async def convertMediaObjectToUrl(self, mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, toMimeType: str) -> str: + async def convertMediaObjectToUrl( + self, + mediaObject: str | scrypted_python.scrypted_sdk.types.MediaObject, + toMimeType: str, + ) -> str: return await self.mediaManager.convertMediaObjectToUrl(mediaObject, toMimeType) - async def createFFmpegMediaObject(self, ffmpegInput: scrypted_python.scrypted_sdk.types.FFmpegInput, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: + async def createFFmpegMediaObject( + self, + ffmpegInput: scrypted_python.scrypted_sdk.types.FFmpegInput, + options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None, + ) -> scrypted_python.scrypted_sdk.types.MediaObject: return await self.mediaManager.createFFmpegMediaObject(ffmpegInput, options) - async def createMediaObject(self, data: Any, mimeType: str, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: + async def createMediaObject( + self, + data: Any, + mimeType: str, + options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None, + ) -> scrypted_python.scrypted_sdk.types.MediaObject: # return await self.createMediaObject(data, mimetypes, options) return MediaObject(data, mimeType, options) - async def createMediaObjectFromUrl(self, data: str, options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None) -> scrypted_python.scrypted_sdk.types.MediaObject: + async def createMediaObjectFromUrl( + self, + data: str, + options: scrypted_python.scrypted_sdk.types.MediaObjectOptions = None, + ) -> scrypted_python.scrypted_sdk.types.MediaObject: return await self.mediaManager.createMediaObjectFromUrl(data, options) async def getFFmpegPath(self) -> str: @@ -236,7 +296,13 @@ class MediaManager: class DeviceState(scrypted_python.scrypted_sdk.types.DeviceState): - def __init__(self, id: str, nativeId: str, systemManager: SystemManager, deviceManager: scrypted_python.scrypted_sdk.types.DeviceManager) -> None: + def __init__( + self, + id: str, + nativeId: str, + systemManager: SystemManager, + deviceManager: scrypted_python.scrypted_sdk.types.DeviceManager, + ) -> None: super().__init__() self._id = id self.nativeId = nativeId @@ -253,7 +319,7 @@ class DeviceState(scrypted_python.scrypted_sdk.types.DeviceState): sdd = deviceState.get(property, None) if not sdd: return None - return sdd.get('value', None) + return sdd.get("value", None) def setScryptedProperty(self, property: str, value: Any): if property == ScryptedInterfaceProperty.id.value: @@ -262,13 +328,14 @@ class DeviceState(scrypted_python.scrypted_sdk.types.DeviceState): raise Exception("mixins is read only") if property == ScryptedInterfaceProperty.interfaces.value: raise Exception( - "interfaces is a read only post-mixin computed property, use providedInterfaces") + "interfaces is a read only post-mixin computed property, use providedInterfaces" + ) now = int(time.time() * 1000) self.systemManager.systemState[self._id][property] = { "lastEventTime": now, "stateTime": now, - "value": value + "value": value, } self.systemManager.api.setState(self.nativeId, property, value) @@ -311,7 +378,9 @@ class DeviceStorage(Storage): class DeviceManager(scrypted_python.scrypted_sdk.types.DeviceManager): - def __init__(self, nativeIds: Mapping[str, DeviceStorage], systemManager: SystemManager) -> None: + def __init__( + self, nativeIds: Mapping[str, DeviceStorage], systemManager: SystemManager + ) -> None: super().__init__() self.nativeIds = nativeIds self.systemManager = systemManager @@ -320,7 +389,9 @@ class DeviceManager(scrypted_python.scrypted_sdk.types.DeviceManager): id = self.nativeIds[nativeId].id return DeviceState(id, nativeId, self.systemManager, self) - async def onDeviceEvent(self, nativeId: str, eventInterface: str, eventData: Any = None) -> None: + async def onDeviceEvent( + self, nativeId: str, eventInterface: str, eventData: Any = None + ) -> None: await self.systemManager.api.onDeviceEvent(nativeId, eventInterface, eventData) async def onDevicesChanged(self, devices: DeviceManifest) -> None: @@ -332,8 +403,12 @@ class DeviceManager(scrypted_python.scrypted_sdk.types.DeviceManager): async def onDeviceRemoved(self, nativeId: str) -> None: return await self.systemManager.api.onDeviceRemoved(nativeId) - async def onMixinEvent(self, id: str, mixinDevice: Any, eventInterface: str, eventData: Any) -> None: - return await self.systemManager.api.onMixinEvent(id, mixinDevice, eventInterface, eventData) + async def onMixinEvent( + self, id: str, mixinDevice: Any, eventInterface: str, eventData: Any + ) -> None: + return await self.systemManager.api.onMixinEvent( + id, mixinDevice, eventInterface, eventData + ) async def requestRestart(self) -> None: return await self.systemManager.api.requestRestart() @@ -343,7 +418,9 @@ class DeviceManager(scrypted_python.scrypted_sdk.types.DeviceManager): class PluginRemote: - def __init__(self, peer: rpc.RpcPeer, api, pluginId: str, hostInfo, loop: AbstractEventLoop): + def __init__( + self, peer: rpc.RpcPeer, api, pluginId: str, hostInfo, loop: AbstractEventLoop + ): self.systemState: Mapping[str, Mapping[str, SystemDeviceState]] = {} self.nativeIds: Mapping[str, DeviceStorage] = {} self.mediaManager: MediaManager @@ -356,166 +433,225 @@ class PluginRemote: self.hostInfo = hostInfo self.loop = loop self.replPort = None - self.__dict__['__proxy_oneway_methods'] = [ - 'notify', - 'updateDeviceState', - 'setSystemState', - 'ioEvent', - 'setNativeId', + self.__dict__["__proxy_oneway_methods"] = [ + "notify", + "updateDeviceState", + "setSystemState", + "ioEvent", + "setNativeId", ] - async def print_async(self, nativeId: str, *values: object, sep: Optional[str] = ' ', - end: Optional[str] = '\n', - flush: bool = False,): + async def print_async( + self, + nativeId: str, + *values: object, + sep: Optional[str] = " ", + end: Optional[str] = "\n", + flush: bool = False, + ): consoleFuture = self.consoles.get(nativeId) if not consoleFuture: consoleFuture = Future() self.consoles[nativeId] = consoleFuture - plugins = await self.api.getComponent('plugins') - port = await plugins.getRemoteServicePort(self.pluginId, 'console-writer') + plugins = await self.api.getComponent("plugins") + port = await plugins.getRemoteServicePort(self.pluginId, "console-writer") connection = await asyncio.open_connection(port=port) _, writer = connection if not nativeId: - nid = 'undefined' + nid = "undefined" else: nid = nativeId - nid += '\n' - writer.write(nid.encode('utf8')) + nid += "\n" + writer.write(nid.encode("utf8")) consoleFuture.set_result(connection) _, writer = await consoleFuture strio = StringIO() print(*values, sep=sep, end=end, flush=flush, file=strio) strio.seek(0) - b = strio.read().encode('utf8') + b = strio.read().encode("utf8") writer.write(b) - def print(self, nativeId: str, *values: object, sep: Optional[str] = ' ', - end: Optional[str] = '\n', - flush: bool = False,): - asyncio.run_coroutine_threadsafe(self.print_async( - nativeId, *values, sep=sep, end=end, flush=flush), self.loop) + def print( + self, + nativeId: str, + *values: object, + sep: Optional[str] = " ", + end: Optional[str] = "\n", + flush: bool = False, + ): + asyncio.run_coroutine_threadsafe( + self.print_async(nativeId, *values, sep=sep, end=end, flush=flush), + self.loop, + ) async def loadZip(self, packageJson, getZip: Any, options: dict): try: return await self.loadZipWrapped(packageJson, getZip, options) except: - print('plugin start/fork failed') + print("plugin start/fork failed") traceback.print_exc() raise async def loadZipWrapped(self, packageJson, getZip: Any, options: dict): sdk = ScryptedStatic() - clusterId = options['clusterId'] - clusterSecret = options['clusterSecret'] + clusterId = options["clusterId"] + clusterSecret = options["clusterSecret"] + SCRYPTED_CLUSTER_ADDRESS = os.environ.get("SCRYPTED_CLUSTER_ADDRESS", None) def computeClusterObjectHash(o: ClusterObject) -> str: m = hashlib.sha256() - m.update(bytes(f"{o['id']}{o['port']}{o.get('sourcePort') or ''}{o['proxyId']}{clusterSecret}", 'utf8')) - return base64.b64encode(m.digest()).decode('utf-8') + m.update( + bytes( + f"{o['id']}{o.get('address') or ''}{o['port']}{o.get('sourceKey', None) or ''}{o['proxyId']}{clusterSecret}", + "utf8", + ) + ) + return base64.b64encode(m.digest()).decode("utf-8") - def onProxySerialization(value: Any, sourcePeerPort: int = None): + def onProxySerialization(value: Any, sourceKey: str = None): properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {} - clusterEntry = properties.get('__cluster', None) - proxyId: str = (clusterEntry and clusterEntry.get('proxyId', None)) or rpc.RpcPeer.generateId() + clusterEntry = properties.get("__cluster", None) + proxyId: str = ( + clusterEntry and clusterEntry.get("proxyId", None) + ) or rpc.RpcPeer.generateId() - if clusterEntry and clusterPort == clusterEntry['port'] and sourcePeerPort != clusterEntry.get('sourcePort', None): + if ( + clusterEntry + and clusterPort == clusterEntry["port"] + and sourceKey != clusterEntry.get("sourceKey", None) + ): clusterEntry = None - if not properties.get('__cluster', None): + if not clusterEntry: clusterEntry: ClusterObject = { - 'id': clusterId, - 'proxyId': proxyId, - 'port': clusterPort, - 'sourcePort': sourcePeerPort, + "id": clusterId, + "proxyId": proxyId, + "address": SCRYPTED_CLUSTER_ADDRESS, + "port": clusterPort, + "sourceKey": sourceKey, } - clusterEntry['sha256'] = computeClusterObjectHash(clusterEntry) - properties['__cluster'] = clusterEntry + clusterEntry["sha256"] = computeClusterObjectHash(clusterEntry) + properties["__cluster"] = clusterEntry return proxyId, properties self.peer.onProxySerialization = onProxySerialization - async def resolveObject(id: str, sourcePeerPort: int): - sourcePeer: rpc.RpcPeer = self.peer if not sourcePeerPort else await rpc.maybe_await(clusterPeers.get(sourcePeerPort)) + async def resolveObject(id: str, sourceKey: str): + sourcePeer: rpc.RpcPeer = ( + self.peer + if not sourceKey + else await rpc.maybe_await(clusterPeers.get(sourceKey, None)) + ) if not sourcePeer: return return sourcePeer.localProxyMap.get(id, None) - clusterPeers: Mapping[int, asyncio.Future[rpc.RpcPeer]] = {} + clusterPeers: Mapping[str, asyncio.Future[rpc.RpcPeer]] = {} - async def handleClusterClient(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): - _, clusterPeerPort = writer.get_extra_info('peername') + def getClusterPeerKey(address: str, port: int): + return f"{address}:{port}" + + async def handleClusterClient( + reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ): + clusterPeerAddress, clusterPeerPort = writer.get_extra_info("peername") + clusterPeerKey = getClusterPeerKey(clusterPeerAddress, clusterPeerPort) rpcTransport = rpc_reader.RpcStreamTransport(reader, writer) peer: rpc.RpcPeer - peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(self.loop, rpcTransport) + peer, peerReadLoop = await rpc_reader.prepare_peer_readloop( + self.loop, rpcTransport + ) peer.onProxySerialization = lambda value: onProxySerialization( - value, clusterPeerPort) + value, clusterPeerPort + ) future: asyncio.Future[rpc.RpcPeer] = asyncio.Future() future.set_result(peer) - clusterPeers[clusterPeerPort] = future + clusterPeers[clusterPeerKey] = future async def connectRPCObject(o: ClusterObject): sha256 = computeClusterObjectHash(o) - if sha256 != o['sha256']: - raise Exception('secret incorrect') - return await resolveObject(o['proxyId'], o.get('sourcePort')) + if sha256 != o["sha256"]: + raise Exception("secret incorrect") + return await resolveObject(o["proxyId"], o.get("sourceKey", None)) - peer.params['connectRPCObject'] = connectRPCObject + peer.params["connectRPCObject"] = connectRPCObject try: await peerReadLoop() except: pass finally: - clusterPeers.pop(clusterPeerPort) - peer.kill('cluster client killed') + clusterPeers.pop(clusterPeerKey) + peer.kill("cluster client killed") writer.close() - clusterRpcServer = await asyncio.start_server(handleClusterClient, '127.0.0.1', 0) + listenAddress = "0.0.0.0" if SCRYPTED_CLUSTER_ADDRESS else "127.0.0.1" + clusterRpcServer = await asyncio.start_server( + handleClusterClient, listenAddress, 0 + ) clusterPort = clusterRpcServer.sockets[0].getsockname()[1] - def ensureClusterPeer(port: int): - clusterPeerPromise = clusterPeers.get(port) - if not clusterPeerPromise: - async def connectClusterPeer(): - reader, writer = await asyncio.open_connection( - '127.0.0.1', port) - _, clusterPeerPort = writer.get_extra_info('sockname') - rpcTransport = rpc_reader.RpcStreamTransport( - reader, writer) - clusterPeer, peerReadLoop = await rpc_reader.prepare_peer_readloop(self.loop, rpcTransport) - clusterPeer.onProxySerialization = lambda value: onProxySerialization( - value, clusterPeerPort) + def ensureClusterPeer(address: str, port: int): + if not address or address == SCRYPTED_CLUSTER_ADDRESS: + address = "127.0.0.1" + clusterPeerKey = getClusterPeerKey(address, port) + clusterPeerPromise = clusterPeers.get(clusterPeerKey) + if clusterPeerPromise: + return clusterPeerPromise - async def run_loop(): - try: - await peerReadLoop() - except: - pass - finally: - clusterPeers.pop(port) - asyncio.run_coroutine_threadsafe(run_loop(), self.loop) - return clusterPeer - clusterPeerPromise = self.loop.create_task( - connectClusterPeer()) - clusterPeers[port] = clusterPeerPromise + async def connectClusterPeer(): + reader, writer = await asyncio.open_connection(address, port) + sourceAddress, sourcePort = writer.get_extra_info("sockname") + if ( + sourceAddress != SCRYPTED_CLUSTER_ADDRESS + and sourceAddress != "127.0.0.1" + ): + print("source address mismatch", sourceAddress) + sourceKey = getClusterPeerKey(sourceAddress, sourcePort) + rpcTransport = rpc_reader.RpcStreamTransport(reader, writer) + clusterPeer, peerReadLoop = await rpc_reader.prepare_peer_readloop( + self.loop, rpcTransport + ) + clusterPeer.onProxySerialization = lambda value: onProxySerialization( + value, sourceKey + ) + + async def run_loop(): + try: + await peerReadLoop() + except: + pass + finally: + clusterPeers.pop(clusterPeerKey) + + asyncio.run_coroutine_threadsafe(run_loop(), self.loop) + return clusterPeer + + clusterPeerPromise = self.loop.create_task(connectClusterPeer()) + + clusterPeers[clusterPeerKey] = clusterPeerPromise return clusterPeerPromise async def connectRPCObject(value): - clusterObject = getattr(value, '__cluster') - if type(clusterObject) is not dict: + __cluster = getattr(value, "__cluster") + if type(__cluster) is not dict: return value - if clusterObject.get('id', None) != clusterId: + clusterObject: ClusterObject = __cluster + + if clusterObject.get("id", None) != clusterId: return value - port = clusterObject['port'] - proxyId = clusterObject['proxyId'] - sourcePort = clusterObject.get('sourcePort', None) + address = clusterObject.get("address", None) + port = clusterObject["port"] + proxyId = clusterObject["proxyId"] if port == clusterPort: - return await resolveObject(proxyId, sourcePort) + return await resolveObject( + proxyId, clusterObject.get("sourceKey", None) + ) - clusterPeerPromise = ensureClusterPeer(port) + clusterPeerPromise = ensureClusterPeer(address, port) try: clusterPeer = await clusterPeerPromise @@ -524,38 +660,40 @@ class PluginRemote: if existing: return existing - peerConnectRPCObject = clusterPeer.tags.get('connectRPCObject') + peerConnectRPCObject = clusterPeer.tags.get("connectRPCObject") if not peerConnectRPCObject: - peerConnectRPCObject = await clusterPeer.getParam('connectRPCObject') - clusterPeer.tags['connectRPCObject'] = peerConnectRPCObject + peerConnectRPCObject = await clusterPeer.getParam( + "connectRPCObject" + ) + clusterPeer.tags["connectRPCObject"] = peerConnectRPCObject newValue = await peerConnectRPCObject(clusterObject) if not newValue: - raise Exception('ipc object not found?') + raise Exception("rpc object not found?") return newValue except Exception as e: return value sdk.connectRPCObject = connectRPCObject - forkMain = options and options.get('fork') - debug = options.get('debug', None) + forkMain = options and options.get("fork") + debug = options.get("debug", None) plugin_volume = pv.ensure_plugin_volume(self.pluginId) - plugin_zip_paths = pv.prep(plugin_volume, options.get('zipHash')) + plugin_zip_paths = pv.prep(plugin_volume, options.get("zipHash")) if debug: scrypted_volume = pv.get_scrypted_volume() # python debugger needs a predictable path for the plugin.zip, # as the vscode python extension doesn't seem to have a way # to read the package.json to configure the python remoteRoot. - zipPath = os.path.join(scrypted_volume, 'plugin.zip') + zipPath = os.path.join(scrypted_volume, "plugin.zip") else: - zipPath = plugin_zip_paths.get('zip_file') + zipPath = plugin_zip_paths.get("zip_file") if not os.path.exists(zipPath) or debug: os.makedirs(os.path.dirname(zipPath), exist_ok=True) zipData = await getZip() - zipPathTmp = zipPath + '.tmp' - with open(zipPathTmp, 'wb') as f: + zipPathTmp = zipPath + ".tmp" + with open(zipPathTmp, "wb") as f: f.write(zipData) try: os.remove(zipPath) @@ -566,69 +704,98 @@ class PluginRemote: zip = zipfile.ZipFile(zipPath) if not forkMain: - multiprocessing.set_start_method('spawn') + multiprocessing.set_start_method("spawn") # it's possible to run 32bit docker on aarch64, which cause pip requirements # to fail because pip only allows filtering on machine, even if running a different architeture. # this will cause prebuilt wheel installation to fail. - if platform.machine() == 'aarch64' and platform.architecture()[0] == '32bit': - print('=============================================') + if ( + platform.machine() == "aarch64" + and platform.architecture()[0] == "32bit" + ): + print("=============================================") print( - 'Python machine vs architecture mismatch detected. Plugin installation may fail.') + "Python machine vs architecture mismatch detected. Plugin installation may fail." + ) print( - 'This issue occurs if a 32bit system was upgraded to a 64bit kernel.') + "This issue occurs if a 32bit system was upgraded to a 64bit kernel." + ) print( - 'Reverting to the 32bit kernel (or reflashing as native 64 bit is recommended.') - print('https://github.com/koush/scrypted/issues/678') - print('=============================================') + "Reverting to the 32bit kernel (or reflashing as native 64 bit is recommended." + ) + print("https://github.com/koush/scrypted/issues/678") + print("=============================================") - python_version = 'python%s' % str( - sys.version_info[0])+"."+str(sys.version_info[1]) - print('python version:', python_version) - print('interpreter:', sys.executable) + python_version = ( + "python%s" % str(sys.version_info[0]) + "." + str(sys.version_info[1]) + ) + print("python version:", python_version) + print("interpreter:", sys.executable) - python_versioned_directory = '%s-%s-%s' % ( - python_version, platform.system(), platform.machine()) - SCRYPTED_PYTHON_VERSION = os.environ.get('SCRYPTED_PYTHON_VERSION') - python_versioned_directory += '-' + SCRYPTED_PYTHON_VERSION + python_versioned_directory = "%s-%s-%s" % ( + python_version, + platform.system(), + platform.machine(), + ) + SCRYPTED_PYTHON_VERSION = os.environ.get("SCRYPTED_PYTHON_VERSION") + python_versioned_directory += "-" + SCRYPTED_PYTHON_VERSION - pip_target = os.path.join( - plugin_volume, python_versioned_directory) + pip_target = os.path.join(plugin_volume, python_versioned_directory) - print('pip target: %s' % pip_target) + print("pip target: %s" % pip_target) if not os.path.exists(pip_target): os.makedirs(pip_target, exist_ok=True) - def read_requirements(filename: str) -> str: if filename in zip.namelist(): - return zip.open(filename).read().decode('utf8') - return '' + return zip.open(filename).read().decode("utf8") + return "" - str_requirements = read_requirements('requirements.txt') - str_optional_requirements = read_requirements('requirements.optional.txt') + str_requirements = read_requirements("requirements.txt") + str_optional_requirements = read_requirements("requirements.optional.txt") scrypted_requirements_basename = os.path.join( - pip_target, 'requirements.scrypted') - requirements_basename = os.path.join( - pip_target, 'requirements') + pip_target, "requirements.scrypted" + ) + requirements_basename = os.path.join(pip_target, "requirements") optional_requirements_basename = os.path.join( - pip_target, 'requirements.optional') + pip_target, "requirements.optional" + ) need_pip = True if str_requirements: need_pip = need_requirements(requirements_basename, str_requirements) if not need_pip: - need_pip = need_requirements(scrypted_requirements_basename, SCRYPTED_REQUIREMENTS) + need_pip = need_requirements( + scrypted_requirements_basename, SCRYPTED_REQUIREMENTS + ) if need_pip: remove_pip_dirs(plugin_volume) - install_with_pip(pip_target, packageJson, SCRYPTED_REQUIREMENTS, scrypted_requirements_basename, ignore_error=True) - install_with_pip(pip_target, packageJson, str_requirements, requirements_basename, ignore_error=False) - install_with_pip(pip_target, packageJson, str_optional_requirements, optional_requirements_basename, ignore_error=True) + install_with_pip( + pip_target, + packageJson, + SCRYPTED_REQUIREMENTS, + scrypted_requirements_basename, + ignore_error=True, + ) + install_with_pip( + pip_target, + packageJson, + str_requirements, + requirements_basename, + ignore_error=False, + ) + install_with_pip( + pip_target, + packageJson, + str_optional_requirements, + optional_requirements_basename, + ignore_error=True, + ) else: - print('requirements.txt (up to date)') + print("requirements.txt (up to date)") print(str_requirements) sys.path.insert(0, zipPath) @@ -653,9 +820,10 @@ class PluginRemote: def host_fork() -> PluginFork: parent_conn, child_conn = multiprocessing.Pipe() pluginFork = PluginFork() - print('new fork') + print("new fork") pluginFork.worker = multiprocessing.Process( - target=plugin_fork, args=(child_conn,), daemon=True) + target=plugin_fork, args=(child_conn,), daemon=True + ) pluginFork.worker.start() def schedule_exit_check(): @@ -664,42 +832,47 @@ class PluginRemote: pluginFork.worker.join() else: schedule_exit_check() + self.loop.call_later(2, exit_check) schedule_exit_check() async def getFork(): - rpcTransport = rpc_reader.RpcConnectionTransport( - parent_conn) - forkPeer, readLoop = await rpc_reader.prepare_peer_readloop(self.loop, rpcTransport) - forkPeer.peerName = 'thread' + rpcTransport = rpc_reader.RpcConnectionTransport(parent_conn) + forkPeer, readLoop = await rpc_reader.prepare_peer_readloop( + self.loop, rpcTransport + ) + forkPeer.peerName = "thread" async def updateStats(stats): - self.ptimeSum += stats['cpu']['user'] + self.ptimeSum += stats["cpu"]["user"] self.allMemoryStats[forkPeer] = stats - forkPeer.params['updateStats'] = updateStats + + forkPeer.params["updateStats"] = updateStats async def forkReadLoop(): try: await readLoop() except: # traceback.print_exc() - print('fork read loop exited') + print("fork read loop exited") finally: self.allMemoryStats.pop(forkPeer) parent_conn.close() rpcTransport.executor.shutdown() pluginFork.worker.kill() - asyncio.run_coroutine_threadsafe( - forkReadLoop(), loop=self.loop) - getRemote = await forkPeer.getParam('getRemote') - remote: PluginRemote = await getRemote(self.api, self.pluginId, self.hostInfo) + + asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop) + getRemote = await forkPeer.getParam("getRemote") + remote: PluginRemote = await getRemote( + self.api, self.pluginId, self.hostInfo + ) await remote.setSystemState(self.systemManager.getSystemState()) for nativeId, ds in self.nativeIds.items(): await remote.setNativeId(nativeId, ds.id, ds.storage) forkOptions = options.copy() - forkOptions['fork'] = True - forkOptions['debug'] = debug + forkOptions["fork"] = True + forkOptions["debug"] = debug return await remote.loadZip(packageJson, getZip, forkOptions) pluginFork.result = asyncio.create_task(getFork()) @@ -711,14 +884,18 @@ class PluginRemote: sdk_init2(sdk) except: from scrypted_sdk import sdk_init # type: ignore - sdk_init(zip, self, self.systemManager, - self.deviceManager, self.mediaManager) + + sdk_init( + zip, self, self.systemManager, self.deviceManager, self.mediaManager + ) if not forkMain: from main import create_scrypted_plugin # type: ignore + pluginInstance = await rpc.maybe_await(create_scrypted_plugin()) try: from plugin_repl import createREPLServer + self.replPort = await createREPLServer(sdk, pluginInstance) except Exception as e: print(f"Warning: Python REPL cannot be loaded: {e}") @@ -726,6 +903,7 @@ class PluginRemote: return pluginInstance from main import fork # type: ignore + forked = await rpc.maybe_await(fork()) if type(forked) == dict: forked[rpc.RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN] = True @@ -753,13 +931,13 @@ class PluginRemote: self.systemState[id] = state async def notify(self, id, eventDetails: EventDetails, value): - property = eventDetails.get('property') + property = eventDetails.get("property") if property: state = None if self.systemState: state = self.systemState.get(id, None) if not state: - print('state not found for %s' % id) + print("state not found for %s" % id) return state[property] = value # systemManager.events.notify(id, eventTime, eventInterface, property, value.value, changed); @@ -776,49 +954,52 @@ class PluginRemote: async def getServicePort(self, name): if name == "repl": if self.replPort is None: - raise Exception('REPL unavailable: Plugin not loaded.') + raise Exception("REPL unavailable: Plugin not loaded.") if self.replPort == 0: - raise Exception('REPL unavailable: Python REPL not available.') + raise Exception("REPL unavailable: Python REPL not available.") return self.replPort - raise Exception(f'unknown service {name}') + raise Exception(f"unknown service {name}") async def start_stats_runner(self): pong = None + async def ping(time: int): nonlocal pong - pong = pong or await self.peer.getParam('pong') + pong = pong or await self.peer.getParam("pong") await pong(time) - self.peer.params['ping'] = ping - update_stats = await self.peer.getParam('updateStats') + self.peer.params["ping"] = ping + + update_stats = await self.peer.getParam("updateStats") if not update_stats: - print('host did not provide update_stats') + print("host did not provide update_stats") return def stats_runner(): ptime = round(time.process_time() * 1000000) + self.ptimeSum try: import psutil + process = psutil.Process(os.getpid()) heapTotal = process.memory_info().rss except: try: import resource - heapTotal = resource.getrusage( - resource.RUSAGE_SELF).ru_maxrss + + heapTotal = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss except: heapTotal = 0 for _, stats in self.allMemoryStats.items(): - heapTotal += stats['memoryUsage']['heapTotal'] + heapTotal += stats["memoryUsage"]["heapTotal"] stats = { - 'cpu': { - 'user': ptime, - 'system': 0, + "cpu": { + "user": ptime, + "system": 0, }, - 'memoryUsage': { - 'heapTotal': heapTotal, + "memoryUsage": { + "heapTotal": heapTotal, }, } asyncio.run_coroutine_threadsafe(update_stats(stats), self.loop) @@ -827,11 +1008,14 @@ class PluginRemote: stats_runner() -async def plugin_async_main(loop: AbstractEventLoop, rpcTransport: rpc_reader.RpcTransport): +async def plugin_async_main( + loop: AbstractEventLoop, rpcTransport: rpc_reader.RpcTransport +): peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, rpcTransport) - peer.params['print'] = print - peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote( - peer, api, pluginId, hostInfo, loop) + peer.params["print"] = print + peer.params["getRemote"] = lambda api, pluginId, hostInfo: PluginRemote( + peer, api, pluginId, hostInfo, loop + ) try: await readLoop() @@ -845,6 +1029,7 @@ def main(rpcTransport: rpc_reader.RpcTransport): def gc_runner(): gc.collect() loop.call_later(10, gc_runner) + gc_runner() loop.run_until_complete(plugin_async_main(loop, rpcTransport)) @@ -864,8 +1049,10 @@ def plugin_main(rpcTransport: rpc_reader.RpcTransport): # if it does, try starting without it. try: import gi - gi.require_version('Gst', '1.0') + + gi.require_version("Gst", "1.0") from gi.repository import GLib, Gst + Gst.init(None) # can't remember why starting the glib main loop is necessary. @@ -873,8 +1060,9 @@ def plugin_main(rpcTransport: rpc_reader.RpcTransport): # seems optional on other platforms. loop = GLib.MainLoop() - worker = threading.Thread(target=main, args=( - rpcTransport,), name="asyncio-main") + worker = threading.Thread( + target=main, args=(rpcTransport,), name="asyncio-main" + ) worker.start() loop.run() diff --git a/server/src/cluster/cluster-hash.ts b/server/src/cluster/cluster-hash.ts index 232bf5108..c05bc5ce5 100644 --- a/server/src/cluster/cluster-hash.ts +++ b/server/src/cluster/cluster-hash.ts @@ -2,6 +2,6 @@ import crypto from "crypto"; import { ClusterObject } from "./connect-rpc-object"; export function computeClusterObjectHash(o: ClusterObject, clusterSecret: string) { - const sha256 = crypto.createHash('sha256').update(`${o.id}${o.port}${o.sourcePort || ''}${o.proxyId}${clusterSecret}`).digest().toString('base64'); + const sha256 = crypto.createHash('sha256').update(`${o.id}${o.address || ''}${o.port}${o.sourceKey || ''}${o.proxyId}${clusterSecret}`).digest().toString('base64'); return sha256; } diff --git a/server/src/cluster/connect-rpc-object.ts b/server/src/cluster/connect-rpc-object.ts index 906a6bb6c..fb708824c 100644 --- a/server/src/cluster/connect-rpc-object.ts +++ b/server/src/cluster/connect-rpc-object.ts @@ -1,8 +1,9 @@ export interface ClusterObject { id: string; + address: string; port: number; proxyId: string; - sourcePort: number; + sourceKey: string; sha256: string; } diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index fb65d658a..efdca5eed 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -101,7 +101,9 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const { clusterId, clusterSecret, zipHash } = zipOptions; const { zipFile, unzippedPath } = await prepareZip(getPluginVolume(pluginId), zipHash, getZip); - const onProxySerialization = (value: any, sourcePeerPort?: number) => { + const SCRYPTED_CLUSTER_ADDRESS = process.env.SCRYPTED_CLUSTER_ADDRESS; + + const onProxySerialization = (value: any, sourceKey?: string) => { const properties = RpcPeer.prepareProxyProperties(value) || {}; let clusterEntry: ClusterObject = properties.__cluster; @@ -111,16 +113,16 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe // if the cluster entry already exists, check if it belongs to this node. // if it belongs to this node, the entry must also be for this peer. // relying on the liveness/gc of a different peer may cause race conditions. - if (clusterEntry && clusterPort === clusterEntry.port && sourcePeerPort !== clusterEntry.sourcePort) + if (clusterEntry && clusterPort === clusterEntry.port && sourceKey !== clusterEntry.sourceKey) clusterEntry = undefined; - // set the cluster identity if it does not exist. if (!clusterEntry) { clusterEntry = { id: clusterId, + address: SCRYPTED_CLUSTER_ADDRESS, port: clusterPort, proxyId, - sourcePort: sourcePeerPort, + sourceKey, sha256: null, }; clusterEntry.sha256 = computeClusterObjectHash(clusterEntry, clusterSecret); @@ -134,8 +136,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe } peer.onProxySerialization = onProxySerialization; - const resolveObject = async (id: string, sourcePeerPort: number) => { - const sourcePeer = sourcePeerPort ? await clusterPeers.get(sourcePeerPort) : peer; + const resolveObject = async (id: string, sourceKey: string) => { + const sourcePeer = sourceKey + ? await clusterPeers.get(sourceKey) + : peer; return sourcePeer?.localProxyMap.get(id); } @@ -143,52 +147,71 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe // on the cluster server that is listening on the actual port/ // incoming connections: use the remote random/unique port // outgoing connections: use the local random/unique port - const clusterPeers = new Map>(); + const clusterPeers = new Map>(); + function getClusterPeerKey(address: string, port: number) { + return `${address}:${port}`; + } + const clusterRpcServer = net.createServer(client => { const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-client', client, client); + const clusterPeerAddress = client.remoteAddress; const clusterPeerPort = client.remotePort; - clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerPort); - clusterPeers.set(clusterPeerPort, Promise.resolve(clusterPeer)); + const clusterPeerKey = getClusterPeerKey(clusterPeerAddress, clusterPeerPort); + clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerKey); + clusterPeers.set(clusterPeerKey, Promise.resolve(clusterPeer)); startPluginRemoteOptions?.onClusterPeer?.(clusterPeer); const connectRPCObject: ConnectRPCObject = async (o) => { const sha256 = computeClusterObjectHash(o, clusterSecret); if (sha256 !== o.sha256) throw new Error('secret incorrect'); - return resolveObject(o.proxyId, o.sourcePort); + return resolveObject(o.proxyId, o.sourceKey); } clusterPeer.params['connectRPCObject'] = connectRPCObject; client.on('close', () => { - clusterPeers.delete(clusterPeerPort); + clusterPeers.delete(clusterPeerKey); clusterPeer.kill('cluster socket closed'); }); }) - const clusterPort = await listenZero(clusterRpcServer, '127.0.0.1'); - const ensureClusterPeer = (connectPort: number) => { - let clusterPeerPromise = clusterPeers.get(connectPort); - if (!clusterPeerPromise) { - clusterPeerPromise = (async () => { - const socket = net.connect(connectPort, '127.0.0.1'); - socket.on('close', () => clusterPeers.delete(connectPort)); + const listenAddress = SCRYPTED_CLUSTER_ADDRESS + ? '0.0.0.0' + : '127.0.0.1'; + const clusterPort = await listenZero(clusterRpcServer, listenAddress); - try { - await once(socket, 'connect'); - // the sourcePort will be added to all rpc objects created by this peer session and used by resolveObject for later - // resolution when trying to find the peer. - const sourcePort = (socket.address() as net.AddressInfo).port; + const ensureClusterPeer = (address: string, connectPort: number) => { + if (!address || address === SCRYPTED_CLUSTER_ADDRESS) + address = '127.0.0.1'; - const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket); - clusterPeer.onProxySerialization = (value) => onProxySerialization(value, sourcePort); - return clusterPeer; - } - catch (e) { - console.error('failure ipc connect', e); - socket.destroy(); - throw e; - } - })(); - clusterPeers.set(connectPort, clusterPeerPromise); - } + const clusterPeerKey = getClusterPeerKey(address, connectPort); + let clusterPeerPromise = clusterPeers.get(clusterPeerKey); + if (clusterPeerPromise) + return clusterPeerPromise; + + clusterPeerPromise = (async () => { + const socket = net.connect(connectPort, address); + socket.on('close', () => clusterPeers.delete(clusterPeerKey)); + + try { + await once(socket, 'connect'); + + // the sourceKey is used by peers to determine if they're already connected. + const { address: sourceAddress, port: sourcePort } = (socket.address() as net.AddressInfo); + if (sourceAddress !== SCRYPTED_CLUSTER_ADDRESS && sourceAddress !== '127.0.0.1') + console.warn("source address mismatch", sourceAddress); + const sourcePeerKey = getClusterPeerKey(sourceAddress, sourcePort); + + const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket); + clusterPeer.onProxySerialization = (value) => onProxySerialization(value, sourcePeerKey); + return clusterPeer; + } + catch (e) { + console.error('failure ipc connect', e); + socket.destroy(); + throw e; + } + })(); + + clusterPeers.set(clusterPeerKey, clusterPeerPromise); return clusterPeerPromise; }; @@ -196,16 +219,16 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const clusterObject: ClusterObject = value?.__cluster; if (clusterObject?.id !== clusterId) return value; - const { port, proxyId, sourcePort } = clusterObject; + const { address, port, proxyId, sourceKey } = clusterObject; // handle the case when trying to connect to an object is on this cluster node, // returning the actual object, rather than initiating a loopback connection. if (port === clusterPort) - return resolveObject(proxyId, sourcePort); + return resolveObject(proxyId, sourceKey); try { - const clusterPeerPromise = ensureClusterPeer(port); + const clusterPeerPromise = ensureClusterPeer(address, port); const clusterPeer = await clusterPeerPromise; - // the proxy id is guaranteed to be unique in all peers in a cluster + // may already have this proxy so check first. const existing = clusterPeer.remoteWeakProxies[proxyId]?.deref(); if (existing) return existing;