diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 5a05ef8d7..b841aa9cf 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -26,12 +26,15 @@ import rpc_reader import scrypted_python.scrypted_sdk.types from plugin_pip import install_with_pip, need_requirements, remove_pip_dirs from scrypted_python.scrypted_sdk import PluginFork, ScryptedStatic -from scrypted_python.scrypted_sdk.types import (Device, DeviceManifest, - EventDetails, - ScryptedInterface, - ScryptedInterfaceMethods, - ScryptedInterfaceProperty, - Storage) +from scrypted_python.scrypted_sdk.types import ( + Device, + DeviceManifest, + EventDetails, + ScryptedInterface, + ScryptedInterfaceMethods, + ScryptedInterfaceProperty, + Storage, +) SCRYPTED_REQUIREMENTS = """ ptpython @@ -509,6 +512,9 @@ class PluginRemote: ) return base64.b64encode(m.digest()).decode("utf-8") + def isClusterAddress(address: str): + return not address or address == SCRYPTED_CLUSTER_ADDRESS + def onProxySerialization(value: Any, sourceKey: str = None): properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {} clusterEntry = properties.get("__cluster", None) @@ -516,12 +522,13 @@ class PluginRemote: clusterEntry and clusterEntry.get("proxyId", None) ) or rpc.RpcPeer.generateId() - if ( - clusterEntry - and clusterPort == clusterEntry["port"] - and sourceKey != clusterEntry.get("sourceKey", None) - ): - clusterEntry = None + if clusterEntry: + if ( + isClusterAddress(clusterEntry.get("address", None)) + and clusterPort == clusterEntry["port"] + and sourceKey != clusterEntry.get("sourceKey", None) + ): + clusterEntry = None if not clusterEntry: clusterEntry: ClusterObject = { @@ -593,7 +600,7 @@ class PluginRemote: clusterPort = clusterRpcServer.sockets[0].getsockname()[1] def ensureClusterPeer(address: str, port: int): - if not address or address == SCRYPTED_CLUSTER_ADDRESS: + if isClusterAddress(address): address = "127.0.0.1" clusterPeerKey = getClusterPeerKey(address, port) clusterPeerPromise = clusterPeers.get(clusterPeerKey) @@ -613,8 +620,8 @@ class PluginRemote: clusterPeer, peerReadLoop = await rpc_reader.prepare_peer_readloop( self.loop, rpcTransport ) - clusterPeer.onProxySerialization = lambda value: onProxySerialization( - value, clusterPeerKey + clusterPeer.onProxySerialization = ( + lambda value: onProxySerialization(value, clusterPeerKey) ) except: clusterPeers.pop(clusterPeerKey) diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 6c1e54490..bcbdf1b52 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -110,6 +110,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const SCRYPTED_CLUSTER_ADDRESS = process.env.SCRYPTED_CLUSTER_ADDRESS; + function isClusterAddress(address: string) { + return !address || address === SCRYPTED_CLUSTER_ADDRESS; + } + const onProxySerialization = (value: any, sourceKey?: string) => { const properties = RpcPeer.prepareProxyProperties(value) || {}; let clusterEntry: ClusterObject = properties.__cluster; @@ -121,8 +125,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe // if the cluster entry already exists, check if it belongs to this node. // if it belongs to this node, the entry must also be for this peer. // relying on the liveness/gc of a different peer may cause race conditions. - if (clusterEntry && clusterPort === clusterEntry.port && sourceKey !== clusterEntry.sourceKey) - clusterEntry = undefined; + if (clusterEntry) { + if (isClusterAddress(clusterEntry?.address) && clusterPort === clusterEntry.port && sourceKey !== clusterEntry.sourceKey) + clusterEntry = undefined; + } if (!clusterEntry) { clusterEntry = { @@ -196,7 +202,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const clusterPort = await listenZero(clusterRpcServer, listenAddress); const ensureClusterPeer = (address: string, connectPort: number) => { - if (!address || address === SCRYPTED_CLUSTER_ADDRESS) + if (isClusterAddress(address)) address = '127.0.0.1'; const clusterPeerKey = getClusterPeerKey(address, connectPort); @@ -265,9 +271,6 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe clusterPeers.delete(threadPeerKey); } peerPromise = tidDeferred.promise.then(port => { - port.on('close', () => peerCleanup()); - port.on('messageerror', () => peerCleanup()); - const threadPeer = NodeThreadWorker.createRpcPeer(peer.selfName, threadPeerKey, port); threadPeer.onProxySerialization = value => onProxySerialization(value, threadPeerKey); @@ -280,6 +283,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe threadPeer.params['connectRPCObject'] = connectRPCObject; function cleanup(message: string) { + peerCleanup(); tidChannels.delete(tid); tidPeers.delete(tid); threadPeer.kill(message);