server: implement multi server clustering

This commit is contained in:
Koushik Dutta
2024-07-31 22:51:56 -07:00
parent de44217f65
commit 4c04e9e403
4 changed files with 465 additions and 253 deletions

View File

@@ -2,6 +2,6 @@ import crypto from "crypto";
import { ClusterObject } from "./connect-rpc-object";
export function computeClusterObjectHash(o: ClusterObject, clusterSecret: string) {
const sha256 = crypto.createHash('sha256').update(`${o.id}${o.port}${o.sourcePort || ''}${o.proxyId}${clusterSecret}`).digest().toString('base64');
const sha256 = crypto.createHash('sha256').update(`${o.id}${o.address || ''}${o.port}${o.sourceKey || ''}${o.proxyId}${clusterSecret}`).digest().toString('base64');
return sha256;
}

View File

@@ -1,8 +1,9 @@
export interface ClusterObject {
id: string;
address: string;
port: number;
proxyId: string;
sourcePort: number;
sourceKey: string;
sha256: string;
}

View File

@@ -101,7 +101,9 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
const { clusterId, clusterSecret, zipHash } = zipOptions;
const { zipFile, unzippedPath } = await prepareZip(getPluginVolume(pluginId), zipHash, getZip);
const onProxySerialization = (value: any, sourcePeerPort?: number) => {
const SCRYPTED_CLUSTER_ADDRESS = process.env.SCRYPTED_CLUSTER_ADDRESS;
const onProxySerialization = (value: any, sourceKey?: string) => {
const properties = RpcPeer.prepareProxyProperties(value) || {};
let clusterEntry: ClusterObject = properties.__cluster;
@@ -111,16 +113,16 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
// if the cluster entry already exists, check if it belongs to this node.
// if it belongs to this node, the entry must also be for this peer.
// relying on the liveness/gc of a different peer may cause race conditions.
if (clusterEntry && clusterPort === clusterEntry.port && sourcePeerPort !== clusterEntry.sourcePort)
if (clusterEntry && clusterPort === clusterEntry.port && sourceKey !== clusterEntry.sourceKey)
clusterEntry = undefined;
// set the cluster identity if it does not exist.
if (!clusterEntry) {
clusterEntry = {
id: clusterId,
address: SCRYPTED_CLUSTER_ADDRESS,
port: clusterPort,
proxyId,
sourcePort: sourcePeerPort,
sourceKey,
sha256: null,
};
clusterEntry.sha256 = computeClusterObjectHash(clusterEntry, clusterSecret);
@@ -134,8 +136,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
}
peer.onProxySerialization = onProxySerialization;
const resolveObject = async (id: string, sourcePeerPort: number) => {
const sourcePeer = sourcePeerPort ? await clusterPeers.get(sourcePeerPort) : peer;
const resolveObject = async (id: string, sourceKey: string) => {
const sourcePeer = sourceKey
? await clusterPeers.get(sourceKey)
: peer;
return sourcePeer?.localProxyMap.get(id);
}
@@ -143,52 +147,71 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
// on the cluster server that is listening on the actual port/
// incoming connections: use the remote random/unique port
// outgoing connections: use the local random/unique port
const clusterPeers = new Map<number, Promise<RpcPeer>>();
const clusterPeers = new Map<string, Promise<RpcPeer>>();
function getClusterPeerKey(address: string, port: number) {
return `${address}:${port}`;
}
const clusterRpcServer = net.createServer(client => {
const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-client', client, client);
const clusterPeerAddress = client.remoteAddress;
const clusterPeerPort = client.remotePort;
clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerPort);
clusterPeers.set(clusterPeerPort, Promise.resolve(clusterPeer));
const clusterPeerKey = getClusterPeerKey(clusterPeerAddress, clusterPeerPort);
clusterPeer.onProxySerialization = (value) => onProxySerialization(value, clusterPeerKey);
clusterPeers.set(clusterPeerKey, Promise.resolve(clusterPeer));
startPluginRemoteOptions?.onClusterPeer?.(clusterPeer);
const connectRPCObject: ConnectRPCObject = async (o) => {
const sha256 = computeClusterObjectHash(o, clusterSecret);
if (sha256 !== o.sha256)
throw new Error('secret incorrect');
return resolveObject(o.proxyId, o.sourcePort);
return resolveObject(o.proxyId, o.sourceKey);
}
clusterPeer.params['connectRPCObject'] = connectRPCObject;
client.on('close', () => {
clusterPeers.delete(clusterPeerPort);
clusterPeers.delete(clusterPeerKey);
clusterPeer.kill('cluster socket closed');
});
})
const clusterPort = await listenZero(clusterRpcServer, '127.0.0.1');
const ensureClusterPeer = (connectPort: number) => {
let clusterPeerPromise = clusterPeers.get(connectPort);
if (!clusterPeerPromise) {
clusterPeerPromise = (async () => {
const socket = net.connect(connectPort, '127.0.0.1');
socket.on('close', () => clusterPeers.delete(connectPort));
const listenAddress = SCRYPTED_CLUSTER_ADDRESS
? '0.0.0.0'
: '127.0.0.1';
const clusterPort = await listenZero(clusterRpcServer, listenAddress);
try {
await once(socket, 'connect');
// the sourcePort will be added to all rpc objects created by this peer session and used by resolveObject for later
// resolution when trying to find the peer.
const sourcePort = (socket.address() as net.AddressInfo).port;
const ensureClusterPeer = (address: string, connectPort: number) => {
if (!address || address === SCRYPTED_CLUSTER_ADDRESS)
address = '127.0.0.1';
const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket);
clusterPeer.onProxySerialization = (value) => onProxySerialization(value, sourcePort);
return clusterPeer;
}
catch (e) {
console.error('failure ipc connect', e);
socket.destroy();
throw e;
}
})();
clusterPeers.set(connectPort, clusterPeerPromise);
}
const clusterPeerKey = getClusterPeerKey(address, connectPort);
let clusterPeerPromise = clusterPeers.get(clusterPeerKey);
if (clusterPeerPromise)
return clusterPeerPromise;
clusterPeerPromise = (async () => {
const socket = net.connect(connectPort, address);
socket.on('close', () => clusterPeers.delete(clusterPeerKey));
try {
await once(socket, 'connect');
// the sourceKey is used by peers to determine if they're already connected.
const { address: sourceAddress, port: sourcePort } = (socket.address() as net.AddressInfo);
if (sourceAddress !== SCRYPTED_CLUSTER_ADDRESS && sourceAddress !== '127.0.0.1')
console.warn("source address mismatch", sourceAddress);
const sourcePeerKey = getClusterPeerKey(sourceAddress, sourcePort);
const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket);
clusterPeer.onProxySerialization = (value) => onProxySerialization(value, sourcePeerKey);
return clusterPeer;
}
catch (e) {
console.error('failure ipc connect', e);
socket.destroy();
throw e;
}
})();
clusterPeers.set(clusterPeerKey, clusterPeerPromise);
return clusterPeerPromise;
};
@@ -196,16 +219,16 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
const clusterObject: ClusterObject = value?.__cluster;
if (clusterObject?.id !== clusterId)
return value;
const { port, proxyId, sourcePort } = clusterObject;
const { address, port, proxyId, sourceKey } = clusterObject;
// handle the case when trying to connect to an object is on this cluster node,
// returning the actual object, rather than initiating a loopback connection.
if (port === clusterPort)
return resolveObject(proxyId, sourcePort);
return resolveObject(proxyId, sourceKey);
try {
const clusterPeerPromise = ensureClusterPeer(port);
const clusterPeerPromise = ensureClusterPeer(address, port);
const clusterPeer = await clusterPeerPromise;
// the proxy id is guaranteed to be unique in all peers in a cluster
// may already have this proxy so check first.
const existing = clusterPeer.remoteWeakProxies[proxyId]?.deref();
if (existing)
return existing;