diff --git a/server/package-lock.json b/server/package-lock.json index 1308a3447..e04c92588 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/server", - "version": "0.117.1", + "version": "0.117.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@scrypted/server", - "version": "0.117.1", + "version": "0.117.2", "hasInstallScript": true, "license": "ISC", "dependencies": { diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index b841aa9cf..4dd3b68d3 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -515,12 +515,17 @@ class PluginRemote: def isClusterAddress(address: str): return not address or address == SCRYPTED_CLUSTER_ADDRESS - def onProxySerialization(value: Any, sourceKey: str = None): + def onProxySerialization(peer: rpc.RpcPeer, value: Any, sourceKey: str = None): properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {} clusterEntry = properties.get("__cluster", None) - proxyId: str = ( - clusterEntry and clusterEntry.get("proxyId", None) - ) or rpc.RpcPeer.generateId() + proxyId: str + existing = peer.localProxied.get(value, None) + if existing: + proxyId = existing["id"] + else: + proxyId = ( + clusterEntry and clusterEntry.get("proxyId", None) + ) or rpc.RpcPeer.generateId() if clusterEntry: if ( @@ -543,7 +548,9 @@ class PluginRemote: return proxyId, properties - self.peer.onProxySerialization = onProxySerialization + self.peer.onProxySerialization = lambda value: onProxySerialization( + self.peer, value, None + ) async def resolveObject(id: str, sourceKey: str): sourcePeer: rpc.RpcPeer = ( @@ -571,7 +578,7 @@ class PluginRemote: self.loop, rpcTransport ) peer.onProxySerialization = lambda value: onProxySerialization( - value, clusterPeerPort + peer, value, clusterPeerKey ) future: asyncio.Future[rpc.RpcPeer] = asyncio.Future() future.set_result(peer) @@ -621,7 +628,9 @@ class PluginRemote: self.loop, rpcTransport ) clusterPeer.onProxySerialization = ( - lambda value: onProxySerialization(value, clusterPeerKey) + lambda value: onProxySerialization( + clusterPeer, value, clusterPeerKey + ) ) except: clusterPeers.pop(clusterPeerKey) diff --git a/server/python/rpc.py b/server/python/rpc.py index 80424d45d..0274847c3 100644 --- a/server/python/rpc.py +++ b/server/python/rpc.py @@ -268,12 +268,21 @@ class RpcPeer: proxiedEntry = self.localProxied.get(value, None) if proxiedEntry: + if self.onProxySerialization: + proxyId, __remote_proxy_props = self.onProxySerialization(value) + else: + __remote_proxy_props = RpcPeer.prepareProxyProperties(value) + proxyId = proxiedEntry['id'] + + if proxyId != proxiedEntry['id']: + raise Exception('onProxySerialization proxy id mismatch') + proxiedEntry['finalizerId'] = RpcPeer.generateId() ret = { - '__remote_proxy_id': proxiedEntry['id'], + '__remote_proxy_id': proxyId, '__remote_proxy_finalizer_id': proxiedEntry['finalizerId'], '__remote_constructor_name': __remote_constructor_name, - '__remote_proxy_props': RpcPeer.prepareProxyProperties(value), + '__remote_proxy_props': __remote_proxy_props, '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), } return ret diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index bcbdf1b52..25c99fa04 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -114,13 +114,13 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe return !address || address === SCRYPTED_CLUSTER_ADDRESS; } - const onProxySerialization = (value: any, sourceKey?: string) => { + const onProxySerialization = (peer: RpcPeer, value: any, sourceKey: string) => { const properties = RpcPeer.prepareProxyProperties(value) || {}; let clusterEntry: ClusterObject = properties.__cluster; // ensure globally stable proxyIds. // worker threads will embed their pid and tid in the proxy id for cross worker fast path. - const proxyId = clusterEntry?.proxyId || `n-${process.pid}-${worker_threads.threadId}-${RpcPeer.generateId()}`; + const proxyId = peer.localProxied.get(value)?.id || clusterEntry?.proxyId || `n-${process.pid}-${worker_threads.threadId}-${RpcPeer.generateId()}`; // if the cluster entry already exists, check if it belongs to this node. // if it belongs to this node, the entry must also be for this peer. @@ -148,7 +148,8 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe properties, }; } - peer.onProxySerialization = onProxySerialization; + + peer.onProxySerialization = value => onProxySerialization(peer, value, undefined); const resolveObject = async (id: string, sourceKey: string) => { const sourcePeer = sourceKey @@ -180,7 +181,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const clusterPeer = createDuplexRpcPeer(peer.selfName, clusterPeerKey, client, client); // the listening peer sourceKey (client address/port) is used by the OTHER peer (the client) // to determine if it is already connected to THIS peer (the server). - clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerKey); + clusterPeer.onProxySerialization = (value) => onProxySerialization(clusterPeer, value, clusterPeerKey); clusterPeers.set(clusterPeerKey, Promise.resolve(clusterPeer)); startPluginRemoteOptions?.onClusterPeer?.(clusterPeer); const connectRPCObject: ConnectRPCObject = async (o) => { @@ -221,7 +222,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe console.warn("source address mismatch", sourceAddress); const clusterPeer = createDuplexRpcPeer(peer.selfName, clusterPeerKey, socket, socket); - clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerKey); + clusterPeer.onProxySerialization = (value) => onProxySerialization(clusterPeer, value, clusterPeerKey); return clusterPeer; } catch (e) { @@ -272,7 +273,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe } peerPromise = tidDeferred.promise.then(port => { const threadPeer = NodeThreadWorker.createRpcPeer(peer.selfName, threadPeerKey, port); - threadPeer.onProxySerialization = value => onProxySerialization(value, threadPeerKey); + threadPeer.onProxySerialization = value => onProxySerialization(threadPeer, value, threadPeerKey); const connectRPCObject: ConnectRPCObject = async (o) => { const sha256 = computeClusterObjectHash(o, clusterSecret); diff --git a/server/src/rpc.ts b/server/src/rpc.ts index 08ea16d80..8b687b052 100644 --- a/server/src/rpc.ts +++ b/server/src/rpc.ts @@ -594,13 +594,25 @@ export class RpcPeer { let proxiedEntry = this.localProxied.get(value); if (proxiedEntry) { + const { + proxyId: __remote_proxy_id, + properties: __remote_proxy_props, + } = this.onProxySerialization?.(value) + || { + proxyId: proxiedEntry.id, + properties: RpcPeer.prepareProxyProperties(value), + }; + + if (__remote_proxy_id !== proxiedEntry.id) + throw new Error('onProxySerialization proxy id mismatch'); + const __remote_proxy_finalizer_id = RpcPeer.generateId(); proxiedEntry.finalizerId = __remote_proxy_finalizer_id; const ret: RpcRemoteProxyValue = { - __remote_proxy_id: proxiedEntry.id, + __remote_proxy_id, __remote_proxy_finalizer_id, __remote_constructor_name, - __remote_proxy_props: RpcPeer.prepareProxyProperties(value), + __remote_proxy_props, __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], } return ret;