From 6d0da449ad76998f652223844b3abb860265c07f Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Tue, 3 Sep 2024 22:31:51 -0700 Subject: [PATCH] server: simplify convoluted peer key --- server/python/plugin_remote.py | 33 +++++----- server/src/plugin/plugin-remote-worker.ts | 76 +++++++++++------------ 2 files changed, 53 insertions(+), 56 deletions(-) diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 4043b83cc..5a05ef8d7 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -601,21 +601,24 @@ class PluginRemote: return clusterPeerPromise async def connectClusterPeer(): - reader, writer = await asyncio.open_connection(address, port) - sourceAddress, sourcePort = writer.get_extra_info("sockname") - if ( - sourceAddress != SCRYPTED_CLUSTER_ADDRESS - and sourceAddress != "127.0.0.1" - ): - print("source address mismatch", sourceAddress) - sourceKey = getClusterPeerKey(sourceAddress, sourcePort) - rpcTransport = rpc_reader.RpcStreamTransport(reader, writer) - clusterPeer, peerReadLoop = await rpc_reader.prepare_peer_readloop( - self.loop, rpcTransport - ) - clusterPeer.onProxySerialization = lambda value: onProxySerialization( - value, sourceKey - ) + try: + reader, writer = await asyncio.open_connection(address, port) + sourceAddress, sourcePort = writer.get_extra_info("sockname") + if ( + sourceAddress != SCRYPTED_CLUSTER_ADDRESS + and sourceAddress != "127.0.0.1" + ): + print("source address mismatch", sourceAddress) + rpcTransport = rpc_reader.RpcStreamTransport(reader, writer) + clusterPeer, peerReadLoop = await rpc_reader.prepare_peer_readloop( + self.loop, rpcTransport + ) + clusterPeer.onProxySerialization = lambda value: onProxySerialization( + value, clusterPeerKey + ) + except: + clusterPeers.pop(clusterPeerKey) + raise async def run_loop(): try: diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 6c687a003..6c1e54490 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -148,9 +148,13 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const sourcePeer = sourceKey ? await clusterPeers.get(sourceKey) : peer; + if (!sourcePeer) + console.error('source peer not found', sourceKey); const ret = sourcePeer?.localProxyMap.get(id); - if (!ret) + if (!ret) { + console.error('source key not found', sourceKey, id); return; + } return ret; } @@ -164,10 +168,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe } const clusterRpcServer = net.createServer(client => { - const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-client', client, client); const clusterPeerAddress = client.remoteAddress; const clusterPeerPort = client.remotePort; const clusterPeerKey = getClusterPeerKey(clusterPeerAddress, clusterPeerPort); + const clusterPeer = createDuplexRpcPeer(peer.selfName, clusterPeerKey, client, client); // the listening peer sourceKey (client address/port) is used by the OTHER peer (the client) // to determine if it is already connected to THIS peer (the server). clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerKey); @@ -206,16 +210,12 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe try { await once(socket, 'connect'); - - // this connecting peer sourceKey (server address/port) is used by the OTHER peer (the server) - // to determine if it is already connected to THIS peer (the client). - const { address: sourceAddress, port: sourcePort } = (socket.address() as net.AddressInfo); + const { address: sourceAddress } = (socket.address() as net.AddressInfo); if (sourceAddress !== SCRYPTED_CLUSTER_ADDRESS && sourceAddress !== '127.0.0.1') console.warn("source address mismatch", sourceAddress); - const sourcePeerKey = getClusterPeerKey(sourceAddress, sourcePort); - const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket); - clusterPeer.onProxySerialization = (value) => onProxySerialization(value, sourcePeerKey); + const clusterPeer = createDuplexRpcPeer(peer.selfName, clusterPeerKey, socket, socket); + clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerKey); return clusterPeer; } catch (e) { @@ -242,30 +242,6 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const tidChannels = new Map>(); const tidPeers = new Map>(); - function finishTidPeerConnection(tid: number, port: worker_threads.MessagePort) { - const threadPeer = NodeThreadWorker.createRpcPeer(peer.selfName, 'thread-server', port); - // this connecting peer sourceKey (thread id) is used by the OTHER peer (the server) - // to determine if it is already connected to THIS peer (the client). - const threadPeerKey = `thread:${worker_threads.threadId}-${tid}`; - threadPeer.onProxySerialization = value => onProxySerialization(value, threadPeerKey); - - const connectRPCObject: ConnectRPCObject = async (o) => { - const sha256 = computeClusterObjectHash(o, clusterSecret); - if (sha256 !== o.sha256) - throw new Error('secret incorrect'); - return resolveObject(o.proxyId, o.sourceKey); - } - threadPeer.params['connectRPCObject'] = connectRPCObject; - function cleanup(message: string) { - tidChannels.delete(tid); - tidPeers.delete(tid); - threadPeer.kill(message); - } - port.on('close', () => cleanup('connection closed.')); - port.on('messageerror', () => cleanup('message error.')); - return threadPeer; - } - function connectTidPeer(tid: number) { let peerPromise = tidPeers.get(tid); if (peerPromise) @@ -284,17 +260,35 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe } } - function cleanup() { - clusterPeers.delete(threadPeerKey); + const threadPeerKey = `thread:${tid}`; + function peerCleanup() { clusterPeers.delete(threadPeerKey); } peerPromise = tidDeferred.promise.then(port => { - port.on('close', () => cleanup()); - port.on('messageerror', () => cleanup()); - return finishTidPeerConnection(tid, port); + port.on('close', () => peerCleanup()); + port.on('messageerror', () => peerCleanup()); + + const threadPeer = NodeThreadWorker.createRpcPeer(peer.selfName, threadPeerKey, port); + threadPeer.onProxySerialization = value => onProxySerialization(value, threadPeerKey); + + const connectRPCObject: ConnectRPCObject = async (o) => { + const sha256 = computeClusterObjectHash(o, clusterSecret); + if (sha256 !== o.sha256) + throw new Error('secret incorrect'); + return resolveObject(o.proxyId, o.sourceKey); + } + threadPeer.params['connectRPCObject'] = connectRPCObject; + + function cleanup(message: string) { + tidChannels.delete(tid); + tidPeers.delete(tid); + threadPeer.kill(message); + } + port.on('close', () => cleanup('connection closed.')); + port.on('messageerror', () => cleanup('message error.')); + return threadPeer; }); - peerPromise.catch(() => cleanup()); - const threadPeerKey = `thread:${worker_threads.threadId}-${tid}`; + peerPromise.catch(() => peerCleanup()); clusterPeers.set(threadPeerKey, peerPromise); tidPeers.set(tid, peerPromise); @@ -417,7 +411,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe return newValue; } catch (e) { - console.error('failure rpc', e); + console.error('failure rpc', clusterObject, e); return value; } }