diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 98968e230..cb6015966 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -414,9 +414,10 @@ class PluginRemote: m.update(bytes(f"{o['id']}{o['port']}{o.get('sourcePort') or ''}{o['proxyId']}{clusterSecret}", 'utf8')) return base64.b64encode(m.digest()).decode('utf-8') - def onProxySerialization(value: Any, proxyId: str, sourcePeerPort: int = None): + def onProxySerialization(value: Any, sourcePeerPort: int = None): properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {} clusterEntry = properties.get('__cluster', None) + proxyId: str = (clusterEntry and clusterEntry.get('proxyId', None)) or rpc.RpcPeer.generateId() if clusterEntry and clusterPort == clusterEntry['port'] and sourcePeerPort != clusterEntry.get('sourcePort', None): clusterEntry = None @@ -431,7 +432,7 @@ class PluginRemote: clusterEntry['sha256'] = computeClusterObjectHash(clusterEntry) properties['__cluster'] = clusterEntry - return properties + return proxyId, properties self.peer.onProxySerialization = onProxySerialization @@ -448,8 +449,8 @@ class PluginRemote: rpcTransport = rpc_reader.RpcStreamTransport(reader, writer) peer: rpc.RpcPeer peer, peerReadLoop = await rpc_reader.prepare_peer_readloop(self.loop, rpcTransport) - peer.onProxySerialization = lambda value, proxyId: onProxySerialization( - value, proxyId, clusterPeerPort) + peer.onProxySerialization = lambda value: onProxySerialization( + value, clusterPeerPort) future: asyncio.Future[rpc.RpcPeer] = asyncio.Future() future.set_result(peer) clusterPeers[clusterPeerPort] = future @@ -483,9 +484,8 @@ class PluginRemote: rpcTransport = rpc_reader.RpcStreamTransport( reader, writer) clusterPeer, peerReadLoop = await rpc_reader.prepare_peer_readloop(self.loop, rpcTransport) - clusterPeer.tags['localPort'] = clusterPeerPort - clusterPeer.onProxySerialization = lambda value, proxyId: onProxySerialization( - value, proxyId, clusterPeerPort) + clusterPeer.onProxySerialization = lambda value: onProxySerialization( + value, clusterPeerPort) async def run_loop(): try: @@ -519,8 +519,11 @@ class PluginRemote: try: clusterPeer = await clusterPeerPromise - if clusterPeer.tags.get('localPort') == sourcePort: - return value + weakref = clusterPeer.remoteWeakProxies.get(proxyId, None) + existing = weakref() if weakref else None + if existing: + return existing + peerConnectRPCObject = clusterPeer.tags.get('connectRPCObject') if not peerConnectRPCObject: peerConnectRPCObject = await clusterPeer.getParam('connectRPCObject') diff --git a/server/python/rpc.py b/server/python/rpc.py index db0d03893..f725bc00b 100644 --- a/server/python/rpc.py +++ b/server/python/rpc.py @@ -126,7 +126,7 @@ class RpcPeer: self.pendingResults: Mapping[str, Future] = {} self.remoteWeakProxies: Mapping[str, any] = {} self.nameDeserializerMap: Mapping[str, RpcSerializer] = {} - self.onProxySerialization: Callable[[Any, str], Any] = None + self.onProxySerialization: Callable[[Any, str], tuple[str, Any]] = None self.killed = False self.tags = {} @@ -274,7 +274,7 @@ class RpcPeer: proxiedEntry = self.localProxied.get(value, None) if proxiedEntry: - proxiedEntry['finalizerId'] = self.generateId() + proxiedEntry['finalizerId'] = RpcPeer.generateId() ret = { '__remote_proxy_id': proxiedEntry['id'], '__remote_proxy_finalizer_id': proxiedEntry['finalizerId'], @@ -292,7 +292,12 @@ class RpcPeer: } return ret - proxyId = self.generateId() + if self.onProxySerialization: + proxyId, __remote_proxy_props = self.onProxySerialization(value) + else: + __remote_proxy_props = RpcPeer.prepareProxyProperties(value) + proxyId = RpcPeer.generateId() + proxiedEntry = { 'id': proxyId, 'finalizerId': proxyId, @@ -300,11 +305,6 @@ class RpcPeer: self.localProxied[value] = proxiedEntry self.localProxyMap[proxyId] = value - if self.onProxySerialization: - __remote_proxy_props = self.onProxySerialization(value, proxyId) - else: - __remote_proxy_props = RpcPeer.prepareProxyProperties(value) - ret = { '__remote_proxy_id': proxyId, '__remote_proxy_finalizer_id': proxyId, @@ -491,7 +491,7 @@ class RpcPeer: randomDigits = string.ascii_uppercase + string.ascii_lowercase + string.digits - def generateId(self): + def generateId(): return ''.join(random.choices(RpcPeer.randomDigits, k=8)) async def createPendingResult(self, cb: Callable[[str, Callable[[Exception], None]], None]): @@ -500,7 +500,7 @@ class RpcPeer: future.set_exception(RPCResultError(None, 'RpcPeer has been killed (createPendingResult)')) return future - id = self.generateId() + id = RpcPeer.generateId() self.pendingResults[id] = future await cb(id, lambda e: future.set_exception(RPCResultError(e, None))) return await future diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 0b14f80c4..0c5009be7 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -96,10 +96,13 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const { clusterId, clusterSecret, zipHash } = zipOptions; const { zipFile, unzippedPath } = await prepareZip(getPluginVolume(pluginId), zipHash, getZip); - const onProxySerialization = (value: any, proxyId: string, sourcePeerPort?: number) => { + const onProxySerialization = (value: any, sourcePeerPort?: number) => { const properties = RpcPeer.prepareProxyProperties(value) || {}; let clusterEntry: ClusterObject = properties.__cluster; + // ensure globally stable proxyIds. + const proxyId = clusterEntry?.proxyId || RpcPeer.generateId(); + // if the cluster entry already exists, check if it belongs to this node. // if it belongs to this node, the entry must also be for this peer. // relying on the liveness/gc of a different peer may cause race conditions. @@ -119,7 +122,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe properties.__cluster = clusterEntry; } - return properties; + return { + proxyId, + properties, + }; } peer.onProxySerialization = onProxySerialization; @@ -136,7 +142,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const clusterRpcServer = net.createServer(client => { const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-client', client, client); const clusterPeerPort = client.remotePort; - clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, clusterPeerPort); + clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerPort); clusterPeers.set(clusterPeerPort, Promise.resolve(clusterPeer)); startPluginRemoteOptions?.onClusterPeer?.(clusterPeer); const connectRPCObject: ConnectRPCObject = async (o) => { @@ -167,8 +173,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const sourcePort = (socket.address() as net.AddressInfo).port; const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket); - clusterPeer.tags.localPort = sourcePort; - clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, sourcePort); + clusterPeer.onProxySerialization = (value) => onProxySerialization(value, sourcePort); return clusterPeer; } catch (e) { @@ -195,10 +200,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe try { const clusterPeerPromise = ensureClusterPeer(port); const clusterPeer = await clusterPeerPromise; - // if the localPort is the sourcePort, that means the rpc object already exists as it originated from this node. - // so return the existing proxy. - if (clusterPeer.tags.localPort === sourcePort) - return value; + // the proxy id is guaranteed to be unique in all peers in a cluster + const existing = clusterPeer.remoteWeakProxies[proxyId]?.deref(); + if (existing) + return existing; let peerConnectRPCObject: ConnectRPCObject = clusterPeer.tags['connectRPCObject']; if (!peerConnectRPCObject) { peerConnectRPCObject = await clusterPeer.getParam('connectRPCObject'); diff --git a/server/src/rpc.ts b/server/src/rpc.ts index 4f5b1f858..9091d1a86 100644 --- a/server/src/rpc.ts +++ b/server/src/rpc.ts @@ -304,7 +304,10 @@ export class RpcPeer { finalizers = new FinalizationRegistry(entry => this.finalize(entry as LocalProxiedEntry)); nameDeserializerMap = new Map(); onProxyTypeSerialization = new Map void>(); - onProxySerialization: (value: any, proxyId: string) => any; + onProxySerialization: (value: any) => { + proxyId: string; + properties: any; + }; constructorSerializerMap = new Map(); transportSafeArgumentTypes = RpcPeer.getDefaultTransportSafeArgumentTypes(); killed: Promise; @@ -630,7 +633,16 @@ export class RpcPeer { this.onProxyTypeSerialization.get(__remote_constructor_name)?.(value); - const __remote_proxy_id = RpcPeer.generateId(); + const { + proxyId: __remote_proxy_id, + properties: __remote_proxy_props, + } = this.onProxySerialization + ? this.onProxySerialization(value) + : { + proxyId: RpcPeer.generateId(), + properties: RpcPeer.prepareProxyProperties(value), + }; + proxiedEntry = { id: __remote_proxy_id, finalizerId: __remote_proxy_id, @@ -638,8 +650,6 @@ export class RpcPeer { this.localProxied.set(value, proxiedEntry); this.localProxyMap.set(__remote_proxy_id, value); - const __remote_proxy_props = this.onProxySerialization ? this.onProxySerialization(value, __remote_proxy_id) : RpcPeer.prepareProxyProperties(value); - const ret: RpcRemoteProxyValue = { __remote_proxy_id, __remote_proxy_finalizer_id: __remote_proxy_id,