server, client: send full ClusterObject on new eio endpoint (#1170)

* server: change connectRPCObject internal signature

* server, client: send ClusterObject + hash validation

---------

Co-authored-by: Koushik Dutta <koushd@gmail.com>
This commit is contained in:
Brett Jia
2023-11-09 12:47:18 -05:00
committed by GitHub
parent e49f26b410
commit 7dec399ed7
5 changed files with 96 additions and 69 deletions

View File

@@ -1,4 +1,3 @@
import crypto from 'crypto';
import { MediaObjectOptions, RTCConnectionManagement, RTCSignalingSession, ScryptedStatic } from "@scrypted/types";
import axios, { AxiosRequestConfig, AxiosRequestHeaders } from 'axios';
import * as eio from 'engine.io-client';
@@ -711,9 +710,9 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
.map(id => systemManager.getDeviceById(id))
.find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`);
const clusterPeers = new Map<number, Promise<{ clusterPeer: RpcPeer, clusterSecret: string }>>();
const ensureClusterPeer = (port: number) => {
let clusterPeerPromise = clusterPeers.get(port);
const clusterPeers = new Map<number, Promise<RpcPeer>>();
const ensureClusterPeer = (clusterObject: ClusterObject) => {
let clusterPeerPromise = clusterPeers.get(clusterObject.port);
if (!clusterPeerPromise) {
clusterPeerPromise = (async () => {
const eioPath = 'engine.io/connectRPCObject';
@@ -722,7 +721,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
path: eioEndpoint,
query: {
cacehBust,
port,
clusterObject: JSON.stringify(clusterObject),
},
withCredentials: true,
extraHeaders,
@@ -733,19 +732,15 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
const clusterPeerSocket = new eio.Socket(explicitBaseUrl, clusterPeerOptions);
let peerReady = false;
clusterPeerSocket.on('close', () => {
clusterPeers.delete(port);
clusterPeers.delete(clusterObject.port);
if (!peerReady) {
throw new Error("peer disconnected before setup completed");
}
});
try {
const clusterSecretPromise = once(clusterPeerSocket, 'message');
await once(clusterPeerSocket, 'open');
const clusterSecret = await clusterSecretPromise as any as string;
const serializer = createRpcDuplexSerializer({
write: data => clusterPeerSocket.send(data),
});
@@ -762,7 +757,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
serializer.setupRpcPeer(clusterPeer);
clusterPeer.tags.localPort = sourcePeerId;
peerReady = true;
return { clusterPeer, clusterSecret };
return clusterPeer;
}
catch (e) {
console.error('failure ipc connect', e);
@@ -770,13 +765,13 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
throw e;
}
})();
clusterPeers.set(port, clusterPeerPromise);
clusterPeers.set(clusterObject.port, clusterPeerPromise);
}
return clusterPeerPromise;
};
const resolveObject = async (proxyId: string, sourcePeerPort: number) => {
const sourcePeer = (await clusterPeers.get(sourcePeerPort))?.clusterPeer;
const sourcePeer = await clusterPeers.get(sourcePeerPort);
if (sourcePeer?.remoteWeakProxies) {
return Object.values(sourcePeer.remoteWeakProxies).find(
v => v.deref()?.__cluster?.proxyId == proxyId
@@ -791,7 +786,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
return value;
}
const { port, proxyId, source } = clusterObject;
const { port, proxyId } = clusterObject;
// check if object is already connected
const resolved = await resolveObject(proxyId, port);
@@ -800,11 +795,10 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
}
try {
const clusterPeerPromise = ensureClusterPeer(port);
const { clusterPeer, clusterSecret } = await clusterPeerPromise;
const clusterPeerPromise = ensureClusterPeer(clusterObject);
const clusterPeer = await clusterPeerPromise;
const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject');
const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex');
const newValue = await connectRPCObject(proxyId, portSecret, source);
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
return newValue;

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import base64
import gc
import os
import platform
@@ -40,6 +41,14 @@ import rpc
import rpc_reader
class ClusterObject(TypedDict):
id: str
port: int
proxyId: str
sourcePort: str
sha256: str
class SystemDeviceState(TypedDict):
lastEventTime: int
stateTime: int
@@ -389,16 +398,22 @@ class PluginRemote:
clusterId = options['clusterId']
clusterSecret = options['clusterSecret']
def computeClusterObjectHash(o: ClusterObject) -> str:
m = hashlib.sha256()
m.update(bytes(f"{o['id']}{o['port']}{o.get('sourcePort', '')}{o['proxyId']}{clusterSecret}", 'utf8'))
return base64.b64encode(m.digest()).decode('utf-8')
def onProxySerialization(value: Any, proxyId: str, source: int = None):
properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {}
clusterEntry = properties.get('__cluster', None)
if not properties.get('__cluster', None):
clusterEntry = {
clusterEntry: ClusterObject = {
'id': clusterId,
'proxyId': proxyId,
'port': clusterPort,
'source': source,
}
clusterEntry['sha256'] = computeClusterObjectHash(clusterEntry)
properties['__cluster'] = clusterEntry
# clusterEntry['proxyId'] = proxyId
@@ -426,13 +441,11 @@ class PluginRemote:
future.set_result(peer)
clusterPeers[clusterPeerPort] = future
async def connectRPCObject(id: str, secret: str, sourcePeerPort: int = None):
m = hashlib.sha256()
m.update(bytes('%s%s' % (clusterPort, clusterSecret), 'utf8'))
portSecret = m.hexdigest()
if secret != portSecret:
async def connectRPCObject(o: ClusterObject):
sha256 = computeClusterObjectHash(o)
if sha256 != o['sha256']:
raise Exception('secret incorrect')
return await resolveObject(id, sourcePeerPort)
return await resolveObject(o['proxyId'], o.get('sourcePort'))
peer.params['connectRPCObject'] = connectRPCObject
try:
@@ -496,10 +509,7 @@ class PluginRemote:
if clusterPeer.tags.get('localPort') == source:
return value
c = await clusterPeer.getParam('connectRPCObject')
m = hashlib.sha256()
m.update(bytes('%s%s' % (port, clusterSecret), 'utf8'))
portSecret = m.hexdigest()
newValue = await c(proxyId, portSecret, source)
newValue = await c(clusterObject)
if not newValue:
raise Exception('ipc object not found?')
return newValue

View File

@@ -1,3 +1,4 @@
import crypto from "crypto";
import net from "net";
import { Socket } from "engine.io";
import { IOSocket } from "../io";
@@ -6,25 +7,30 @@ export interface ClusterObject {
id: string;
port: number;
proxyId: string;
source: number;
sourcePort: number;
sha256: string;
}
export type ConnectRPCObject = (id: string, secret: string, sourcePeerPort: number) => Promise<any>;
export type ConnectRPCObject = (o: ClusterObject) => Promise<any>;
/*
* Handle incoming connections that will be
* proxied to a connectRPCObject socket.
*
* It is the responsibility of the caller of
* this function to verify the signature of
* clusterObject using the clusterSecret.
*/
export function setupConnectRPCObjectProxy(clusterSecret: string, port: number, connection: Socket & IOSocket) {
if (!port) {
throw new Error("invalid port");
}
connection.send(clusterSecret);
const socket = net.connect(port, '127.0.0.1');
export function setupConnectRPCObjectProxy(clusterObject: ClusterObject, connection: Socket & IOSocket) {
const socket = net.connect(clusterObject.port, '127.0.0.1');
socket.on('close', () => connection.close());
socket.on('data', data => connection.send(data));
connection.on('close', () => socket.destroy());
connection.on('message', message => socket.write(message));
};
export function computeClusterObjectHash(o: ClusterObject, clusterSecret: string) {
const sha256 = crypto.createHash('sha256').update(`${o.id}${o.port}${o.sourcePort || ''}${o.proxyId}${clusterSecret}`).digest().toString('base64');
return sha256;
}

View File

@@ -1,5 +1,6 @@
import { ScryptedStatic, SystemManager } from '@scrypted/types';
import AdmZip from 'adm-zip';
import crypto from 'crypto';
import { once } from 'events';
import fs from 'fs';
import { Volume } from 'memfs';
@@ -9,16 +10,15 @@ import { install as installSourceMapSupport } from 'source-map-support';
import { listenZero } from '../listen-zero';
import { RpcMessage, RpcPeer } from '../rpc';
import { createDuplexRpcPeer } from '../rpc-serializer';
import { ClusterObject, ConnectRPCObject, computeClusterObjectHash } from './connect-rpc-object';
import { MediaManagerImpl } from './media';
import { PluginAPI, PluginAPIProxy, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api';
import { prepareConsoles } from './plugin-console';
import { getPluginNodePath, installOptionalDependencies } from './plugin-npm-dependencies';
import { attachPluginRemote, DeviceManagerImpl, PluginReader, setupPluginRemote } from './plugin-remote';
import { DeviceManagerImpl, PluginReader, attachPluginRemote, setupPluginRemote } from './plugin-remote';
import { PluginStats, startStatsUpdater } from './plugin-remote-stats';
import { createREPLServer } from './plugin-repl';
import { NodeThreadWorker } from './runtime/node-thread-worker';
import { ClusterObject, ConnectRPCObject } from './connect-rpc-object';
import crypto from 'crypto';
const { link } = require('linkfs');
const serverVersion = require('../../package.json').version;
@@ -79,7 +79,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
async onLoadZip(scrypted: ScryptedStatic, params: any, packageJson: any, zipData: Buffer | string, zipOptions: PluginRemoteLoadZipOptions) {
const { clusterId, clusterSecret } = zipOptions;
const onProxySerialization = (value: any, proxyId: string, source?: number) => {
const onProxySerialization = (value: any, proxyId: string, sourcePeerPort?: number) => {
const properties = RpcPeer.prepareProxyProperties(value) || {};
let clusterEntry: ClusterObject = properties.__cluster;
@@ -89,8 +89,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
id: clusterId,
port: clusterPort,
proxyId,
source,
sourcePort: sourcePeerPort,
sha256: null,
};
clusterEntry.sha256 = computeClusterObjectHash(clusterEntry, clusterSecret);
properties.__cluster = clusterEntry;
}
// always reassign the id and source.
@@ -118,11 +120,11 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, clusterPeerPort);
clusterPeers.set(clusterPeerPort, Promise.resolve(clusterPeer));
startPluginRemoteOptions?.onClusterPeer?.(clusterPeer);
const portSecret = crypto.createHash('sha256').update(`${clusterPort}${clusterSecret}`).digest().toString('hex');
const connectRPCObject: ConnectRPCObject = async (id, secret, sourcePeerPort) => {
if (secret !== portSecret)
const connectRPCObject: ConnectRPCObject = async (o) => {
const sha256 = computeClusterObjectHash(o, clusterSecret);
if (sha256 !== o.sha256)
throw new Error('secret incorrect');
return resolveObject(id, sourcePeerPort);
return resolveObject(o.proxyId, o.sourcePort);
}
clusterPeer.params['connectRPCObject'] = connectRPCObject;
client.on('close', () => {
@@ -130,22 +132,24 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
clusterPeer.kill('cluster socket closed');
});
})
const clusterPort = await listenZero(clusterRpcServer);
const clusterPort = await listenZero(clusterRpcServer, '127.0.0.1');
const ensureClusterPeer = (port: number) => {
let clusterPeerPromise = clusterPeers.get(port);
const ensureClusterPeer = (connectPort: number) => {
let clusterPeerPromise = clusterPeers.get(connectPort);
if (!clusterPeerPromise) {
clusterPeerPromise = (async () => {
const socket = net.connect(port, '127.0.0.1');
socket.on('close', () => clusterPeers.delete(port));
const socket = net.connect(connectPort, '127.0.0.1');
socket.on('close', () => clusterPeers.delete(connectPort));
try {
await once(socket, 'connect');
const clusterPeerPort = (socket.address() as net.AddressInfo).port;
// the sourcePort will be added to all rpc objects created by this peer session and used by resolveObject for later
// resolution when trying to find the peer.
const sourcePort = (socket.address() as net.AddressInfo).port;
const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket);
clusterPeer.tags.localPort = clusterPeerPort;
clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, clusterPeerPort);
clusterPeer.tags.localPort = sourcePort;
clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, sourcePort);
return clusterPeer;
}
catch (e) {
@@ -154,7 +158,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
throw e;
}
})();
clusterPeers.set(port, clusterPeerPromise);
clusterPeers.set(connectPort, clusterPeerPromise);
}
return clusterPeerPromise;
};
@@ -163,25 +167,27 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
const clusterObject: ClusterObject = value?.__cluster;
if (clusterObject?.id !== clusterId)
return value;
const { port, proxyId, source } = clusterObject;
const { port, proxyId, sourcePort } = clusterObject;
// handle the case when trying to connect to an object is on this cluster node,
// returning the actual object, rather than initiating a loopback connection.
if (port === clusterPort)
return resolveObject(proxyId, source);
return resolveObject(proxyId, sourcePort);
try {
const clusterPeerPromise = ensureClusterPeer(port);
const clusterPeer = await clusterPeerPromise;
// this object is already connected
if (clusterPeer.tags.localPort === source)
// if the localPort is the sourcePort, that means the rpc object already exists as it originated from this node.
// so return the existing proxy.
if (clusterPeer.tags.localPort === sourcePort)
return value;
const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject');
const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex');
const newValue = await connectRPCObject(proxyId, portSecret, source);
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
throw new Error('rpc object not found?');
return newValue;
}
catch (e) {
console.error('failure ipc', e);
console.error('failure rpc', e);
return value;
}
}

View File

@@ -34,7 +34,7 @@ import { getPluginVolume } from './plugin/plugin-volume';
import { NodeForkWorker } from './plugin/runtime/node-fork-worker';
import { PythonRuntimeWorker } from './plugin/runtime/python-worker';
import { RuntimeWorker, RuntimeWorkerOptions } from './plugin/runtime/runtime-worker';
import { setupConnectRPCObjectProxy } from './plugin/connect-rpc-object';
import { ClusterObject, computeClusterObjectHash, setupConnectRPCObjectProxy } from './plugin/connect-rpc-object';
import { getIpAddress, SCRYPTED_INSECURE_PORT, SCRYPTED_SECURE_PORT } from './server-settings';
import { AddressSettings } from './services/addresses';
import { Alerts } from './services/alerts';
@@ -163,7 +163,18 @@ export class ScryptedRuntime extends PluginHttp<HttpPluginData> {
res.end();
return;
}
if (!req.query.port) {
if (!req.query.clusterObject) {
res.writeHead(404);
res.end();
return;
}
try {
const clusterObject: ClusterObject = JSON.parse(req.query.clusterObject as string);
const sha256 = computeClusterObjectHash(clusterObject, this.clusterSecret);
if (sha256 != clusterObject.sha256) {
throw Error("invalid signature");
}
} catch {
res.writeHead(404);
res.end();
return;
@@ -173,8 +184,8 @@ export class ScryptedRuntime extends PluginHttp<HttpPluginData> {
this.connectRPCObjectIO.on('connection', connection => {
try {
const clusterObjectPortHeader = (connection.request as Request).query.port as string;
setupConnectRPCObjectProxy(this.clusterSecret, parseInt(clusterObjectPortHeader), connection);
const clusterObject: ClusterObject = JSON.parse((connection.request as Request).query.clusterObject as string);
setupConnectRPCObjectProxy(clusterObject, connection);
} catch {
connection.close();
}