diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 18bedabc3..1e87d470f 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -1,4 +1,3 @@ -import crypto from 'crypto'; import { MediaObjectOptions, RTCConnectionManagement, RTCSignalingSession, ScryptedStatic } from "@scrypted/types"; import axios, { AxiosRequestConfig, AxiosRequestHeaders } from 'axios'; import * as eio from 'engine.io-client'; @@ -711,9 +710,9 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro .map(id => systemManager.getDeviceById(id)) .find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`); - const clusterPeers = new Map>(); - const ensureClusterPeer = (port: number) => { - let clusterPeerPromise = clusterPeers.get(port); + const clusterPeers = new Map>(); + const ensureClusterPeer = (clusterObject: ClusterObject) => { + let clusterPeerPromise = clusterPeers.get(clusterObject.port); if (!clusterPeerPromise) { clusterPeerPromise = (async () => { const eioPath = 'engine.io/connectRPCObject'; @@ -722,7 +721,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro path: eioEndpoint, query: { cacehBust, - port, + clusterObject: JSON.stringify(clusterObject), }, withCredentials: true, extraHeaders, @@ -733,19 +732,15 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro const clusterPeerSocket = new eio.Socket(explicitBaseUrl, clusterPeerOptions); let peerReady = false; clusterPeerSocket.on('close', () => { - clusterPeers.delete(port); + clusterPeers.delete(clusterObject.port); if (!peerReady) { throw new Error("peer disconnected before setup completed"); } }); try { - const clusterSecretPromise = once(clusterPeerSocket, 'message'); - await once(clusterPeerSocket, 'open'); - const clusterSecret = await clusterSecretPromise as any as string; - const serializer = createRpcDuplexSerializer({ write: data => clusterPeerSocket.send(data), }); @@ -762,7 +757,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro serializer.setupRpcPeer(clusterPeer); clusterPeer.tags.localPort = sourcePeerId; peerReady = true; - return { clusterPeer, clusterSecret }; + return clusterPeer; } catch (e) { console.error('failure ipc connect', e); @@ -770,13 +765,13 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro throw e; } })(); - clusterPeers.set(port, clusterPeerPromise); + clusterPeers.set(clusterObject.port, clusterPeerPromise); } return clusterPeerPromise; }; const resolveObject = async (proxyId: string, sourcePeerPort: number) => { - const sourcePeer = (await clusterPeers.get(sourcePeerPort))?.clusterPeer; + const sourcePeer = await clusterPeers.get(sourcePeerPort); if (sourcePeer?.remoteWeakProxies) { return Object.values(sourcePeer.remoteWeakProxies).find( v => v.deref()?.__cluster?.proxyId == proxyId @@ -791,7 +786,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro return value; } - const { port, proxyId, source } = clusterObject; + const { port, proxyId } = clusterObject; // check if object is already connected const resolved = await resolveObject(proxyId, port); @@ -800,11 +795,10 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro } try { - const clusterPeerPromise = ensureClusterPeer(port); - const { clusterPeer, clusterSecret } = await clusterPeerPromise; + const clusterPeerPromise = ensureClusterPeer(clusterObject); + const clusterPeer = await clusterPeerPromise; const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject'); - const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex'); - const newValue = await connectRPCObject(proxyId, portSecret, source); + const newValue = await connectRPCObject(clusterObject); if (!newValue) throw new Error('ipc object not found?'); return newValue; diff --git a/server/python/plugin_remote.py b/server/python/plugin_remote.py index 72c54283a..705681bfa 100644 --- a/server/python/plugin_remote.py +++ b/server/python/plugin_remote.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import base64 import gc import os import platform @@ -40,6 +41,14 @@ import rpc import rpc_reader +class ClusterObject(TypedDict): + id: str + port: int + proxyId: str + sourcePort: str + sha256: str + + class SystemDeviceState(TypedDict): lastEventTime: int stateTime: int @@ -389,16 +398,22 @@ class PluginRemote: clusterId = options['clusterId'] clusterSecret = options['clusterSecret'] + def computeClusterObjectHash(o: ClusterObject) -> str: + m = hashlib.sha256() + m.update(bytes(f"{o['id']}{o['port']}{o.get('sourcePort', '')}{o['proxyId']}{clusterSecret}", 'utf8')) + return base64.b64encode(m.digest()).decode('utf-8') + def onProxySerialization(value: Any, proxyId: str, source: int = None): properties: dict = rpc.RpcPeer.prepareProxyProperties(value) or {} clusterEntry = properties.get('__cluster', None) if not properties.get('__cluster', None): - clusterEntry = { + clusterEntry: ClusterObject = { 'id': clusterId, 'proxyId': proxyId, 'port': clusterPort, 'source': source, } + clusterEntry['sha256'] = computeClusterObjectHash(clusterEntry) properties['__cluster'] = clusterEntry # clusterEntry['proxyId'] = proxyId @@ -426,13 +441,11 @@ class PluginRemote: future.set_result(peer) clusterPeers[clusterPeerPort] = future - async def connectRPCObject(id: str, secret: str, sourcePeerPort: int = None): - m = hashlib.sha256() - m.update(bytes('%s%s' % (clusterPort, clusterSecret), 'utf8')) - portSecret = m.hexdigest() - if secret != portSecret: + async def connectRPCObject(o: ClusterObject): + sha256 = computeClusterObjectHash(o) + if sha256 != o['sha256']: raise Exception('secret incorrect') - return await resolveObject(id, sourcePeerPort) + return await resolveObject(o['proxyId'], o.get('sourcePort')) peer.params['connectRPCObject'] = connectRPCObject try: @@ -496,10 +509,7 @@ class PluginRemote: if clusterPeer.tags.get('localPort') == source: return value c = await clusterPeer.getParam('connectRPCObject') - m = hashlib.sha256() - m.update(bytes('%s%s' % (port, clusterSecret), 'utf8')) - portSecret = m.hexdigest() - newValue = await c(proxyId, portSecret, source) + newValue = await c(clusterObject) if not newValue: raise Exception('ipc object not found?') return newValue diff --git a/server/src/plugin/connect-rpc-object.ts b/server/src/plugin/connect-rpc-object.ts index 3486af81c..ac4511084 100644 --- a/server/src/plugin/connect-rpc-object.ts +++ b/server/src/plugin/connect-rpc-object.ts @@ -1,3 +1,4 @@ +import crypto from "crypto"; import net from "net"; import { Socket } from "engine.io"; import { IOSocket } from "../io"; @@ -6,25 +7,30 @@ export interface ClusterObject { id: string; port: number; proxyId: string; - source: number; + sourcePort: number; + sha256: string; } -export type ConnectRPCObject = (id: string, secret: string, sourcePeerPort: number) => Promise; +export type ConnectRPCObject = (o: ClusterObject) => Promise; /* * Handle incoming connections that will be * proxied to a connectRPCObject socket. + * + * It is the responsibility of the caller of + * this function to verify the signature of + * clusterObject using the clusterSecret. */ -export function setupConnectRPCObjectProxy(clusterSecret: string, port: number, connection: Socket & IOSocket) { - if (!port) { - throw new Error("invalid port"); - } - - connection.send(clusterSecret); - - const socket = net.connect(port, '127.0.0.1'); +export function setupConnectRPCObjectProxy(clusterObject: ClusterObject, connection: Socket & IOSocket) { + const socket = net.connect(clusterObject.port, '127.0.0.1'); socket.on('close', () => connection.close()); socket.on('data', data => connection.send(data)); connection.on('close', () => socket.destroy()); connection.on('message', message => socket.write(message)); }; + + +export function computeClusterObjectHash(o: ClusterObject, clusterSecret: string) { + const sha256 = crypto.createHash('sha256').update(`${o.id}${o.port}${o.sourcePort || ''}${o.proxyId}${clusterSecret}`).digest().toString('base64'); + return sha256; +} diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index f9fabfa67..57263e53a 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -1,5 +1,6 @@ import { ScryptedStatic, SystemManager } from '@scrypted/types'; import AdmZip from 'adm-zip'; +import crypto from 'crypto'; import { once } from 'events'; import fs from 'fs'; import { Volume } from 'memfs'; @@ -9,16 +10,15 @@ import { install as installSourceMapSupport } from 'source-map-support'; import { listenZero } from '../listen-zero'; import { RpcMessage, RpcPeer } from '../rpc'; import { createDuplexRpcPeer } from '../rpc-serializer'; +import { ClusterObject, ConnectRPCObject, computeClusterObjectHash } from './connect-rpc-object'; import { MediaManagerImpl } from './media'; import { PluginAPI, PluginAPIProxy, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api'; import { prepareConsoles } from './plugin-console'; import { getPluginNodePath, installOptionalDependencies } from './plugin-npm-dependencies'; -import { attachPluginRemote, DeviceManagerImpl, PluginReader, setupPluginRemote } from './plugin-remote'; +import { DeviceManagerImpl, PluginReader, attachPluginRemote, setupPluginRemote } from './plugin-remote'; import { PluginStats, startStatsUpdater } from './plugin-remote-stats'; import { createREPLServer } from './plugin-repl'; import { NodeThreadWorker } from './runtime/node-thread-worker'; -import { ClusterObject, ConnectRPCObject } from './connect-rpc-object'; -import crypto from 'crypto'; const { link } = require('linkfs'); const serverVersion = require('../../package.json').version; @@ -79,7 +79,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe async onLoadZip(scrypted: ScryptedStatic, params: any, packageJson: any, zipData: Buffer | string, zipOptions: PluginRemoteLoadZipOptions) { const { clusterId, clusterSecret } = zipOptions; - const onProxySerialization = (value: any, proxyId: string, source?: number) => { + const onProxySerialization = (value: any, proxyId: string, sourcePeerPort?: number) => { const properties = RpcPeer.prepareProxyProperties(value) || {}; let clusterEntry: ClusterObject = properties.__cluster; @@ -89,8 +89,10 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe id: clusterId, port: clusterPort, proxyId, - source, + sourcePort: sourcePeerPort, + sha256: null, }; + clusterEntry.sha256 = computeClusterObjectHash(clusterEntry, clusterSecret); properties.__cluster = clusterEntry; } // always reassign the id and source. @@ -118,11 +120,11 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, clusterPeerPort); clusterPeers.set(clusterPeerPort, Promise.resolve(clusterPeer)); startPluginRemoteOptions?.onClusterPeer?.(clusterPeer); - const portSecret = crypto.createHash('sha256').update(`${clusterPort}${clusterSecret}`).digest().toString('hex'); - const connectRPCObject: ConnectRPCObject = async (id, secret, sourcePeerPort) => { - if (secret !== portSecret) + const connectRPCObject: ConnectRPCObject = async (o) => { + const sha256 = computeClusterObjectHash(o, clusterSecret); + if (sha256 !== o.sha256) throw new Error('secret incorrect'); - return resolveObject(id, sourcePeerPort); + return resolveObject(o.proxyId, o.sourcePort); } clusterPeer.params['connectRPCObject'] = connectRPCObject; client.on('close', () => { @@ -130,22 +132,24 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe clusterPeer.kill('cluster socket closed'); }); }) - const clusterPort = await listenZero(clusterRpcServer); + const clusterPort = await listenZero(clusterRpcServer, '127.0.0.1'); - const ensureClusterPeer = (port: number) => { - let clusterPeerPromise = clusterPeers.get(port); + const ensureClusterPeer = (connectPort: number) => { + let clusterPeerPromise = clusterPeers.get(connectPort); if (!clusterPeerPromise) { clusterPeerPromise = (async () => { - const socket = net.connect(port, '127.0.0.1'); - socket.on('close', () => clusterPeers.delete(port)); + const socket = net.connect(connectPort, '127.0.0.1'); + socket.on('close', () => clusterPeers.delete(connectPort)); try { await once(socket, 'connect'); - const clusterPeerPort = (socket.address() as net.AddressInfo).port; + // the sourcePort will be added to all rpc objects created by this peer session and used by resolveObject for later + // resolution when trying to find the peer. + const sourcePort = (socket.address() as net.AddressInfo).port; const clusterPeer = createDuplexRpcPeer(peer.selfName, 'cluster-server', socket, socket); - clusterPeer.tags.localPort = clusterPeerPort; - clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, clusterPeerPort); + clusterPeer.tags.localPort = sourcePort; + clusterPeer.onProxySerialization = (value, proxyId) => onProxySerialization(value, proxyId, sourcePort); return clusterPeer; } catch (e) { @@ -154,7 +158,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe throw e; } })(); - clusterPeers.set(port, clusterPeerPromise); + clusterPeers.set(connectPort, clusterPeerPromise); } return clusterPeerPromise; }; @@ -163,25 +167,27 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const clusterObject: ClusterObject = value?.__cluster; if (clusterObject?.id !== clusterId) return value; - const { port, proxyId, source } = clusterObject; + const { port, proxyId, sourcePort } = clusterObject; + // handle the case when trying to connect to an object is on this cluster node, + // returning the actual object, rather than initiating a loopback connection. if (port === clusterPort) - return resolveObject(proxyId, source); + return resolveObject(proxyId, sourcePort); try { const clusterPeerPromise = ensureClusterPeer(port); const clusterPeer = await clusterPeerPromise; - // this object is already connected - if (clusterPeer.tags.localPort === source) + // if the localPort is the sourcePort, that means the rpc object already exists as it originated from this node. + // so return the existing proxy. + if (clusterPeer.tags.localPort === sourcePort) return value; const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject'); - const portSecret = crypto.createHash('sha256').update(`${port}${clusterSecret}`).digest().toString('hex'); - const newValue = await connectRPCObject(proxyId, portSecret, source); + const newValue = await connectRPCObject(clusterObject); if (!newValue) - throw new Error('ipc object not found?'); + throw new Error('rpc object not found?'); return newValue; } catch (e) { - console.error('failure ipc', e); + console.error('failure rpc', e); return value; } } diff --git a/server/src/runtime.ts b/server/src/runtime.ts index 83964da76..b44efefff 100644 --- a/server/src/runtime.ts +++ b/server/src/runtime.ts @@ -34,7 +34,7 @@ import { getPluginVolume } from './plugin/plugin-volume'; import { NodeForkWorker } from './plugin/runtime/node-fork-worker'; import { PythonRuntimeWorker } from './plugin/runtime/python-worker'; import { RuntimeWorker, RuntimeWorkerOptions } from './plugin/runtime/runtime-worker'; -import { setupConnectRPCObjectProxy } from './plugin/connect-rpc-object'; +import { ClusterObject, computeClusterObjectHash, setupConnectRPCObjectProxy } from './plugin/connect-rpc-object'; import { getIpAddress, SCRYPTED_INSECURE_PORT, SCRYPTED_SECURE_PORT } from './server-settings'; import { AddressSettings } from './services/addresses'; import { Alerts } from './services/alerts'; @@ -163,7 +163,18 @@ export class ScryptedRuntime extends PluginHttp { res.end(); return; } - if (!req.query.port) { + if (!req.query.clusterObject) { + res.writeHead(404); + res.end(); + return; + } + try { + const clusterObject: ClusterObject = JSON.parse(req.query.clusterObject as string); + const sha256 = computeClusterObjectHash(clusterObject, this.clusterSecret); + if (sha256 != clusterObject.sha256) { + throw Error("invalid signature"); + } + } catch { res.writeHead(404); res.end(); return; @@ -173,8 +184,8 @@ export class ScryptedRuntime extends PluginHttp { this.connectRPCObjectIO.on('connection', connection => { try { - const clusterObjectPortHeader = (connection.request as Request).query.port as string; - setupConnectRPCObjectProxy(this.clusterSecret, parseInt(clusterObjectPortHeader), connection); + const clusterObject: ClusterObject = JSON.parse((connection.request as Request).query.clusterObject as string); + setupConnectRPCObjectProxy(clusterObject, connection); } catch { connection.close(); }