From ebe6bcc58fd39a4a7b39de65fbe66c215f0dee52 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Tue, 16 Dec 2025 12:22:03 -0800 Subject: [PATCH] client: add worker/fork support to web client --- packages/client/src/index.ts | 632 ++++++++++++++++++++++++----------- 1 file changed, 432 insertions(+), 200 deletions(-) diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 27f57b15c..65936a1f9 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -1,19 +1,17 @@ -import { ConnectRPCObjectOptions, MediaObjectCreateOptions, ScryptedStatic } from "@scrypted/types"; +import { ConnectRPCObjectOptions, ForkOptions, ForkWorker, MediaObjectCreateOptions, PluginFork, ScryptedInterface, ScryptedInterfaceProperty, ScryptedStatic } from "@scrypted/types"; import * as eio from 'engine.io-client'; -import { SocketOptions } from 'engine.io-client'; import { timeoutPromise } from "../../../common/src/promise-utils"; import type { ClusterObject, ConnectRPCObject } from '../../../server/src/cluster/connect-rpc-object'; +import { domFetch } from "../../../server/src/fetch"; +import { httpFetch } from '../../../server/src/fetch/http-fetch'; import type { IOSocket } from '../../../server/src/io'; import { MediaObject } from '../../../server/src/plugin/mediaobject'; +import { PluginAPIProxy, PluginRemote } from "../../../server/src/plugin/plugin-api"; import { attachPluginRemote } from '../../../server/src/plugin/plugin-remote'; import { RpcPeer } from '../../../server/src/rpc'; import { createRpcDuplexSerializer, createRpcSerializer } from '../../../server/src/rpc-serializer'; import packageJson from '../package.json'; import { isIPAddress } from "./ip"; - -import { domFetch } from "../../../server/src/fetch"; -import { httpFetch } from '../../../server/src/fetch/http-fetch'; - export * as rpc from '../../../server/src/rpc'; export * as rpc_serializer from '../../../server/src/rpc-serializer'; @@ -33,6 +31,15 @@ const sourcePeerId = RpcPeer.generateId(); type IOClientSocket = eio.Socket & IOSocket; +interface InternalFork extends Pick { + extraHeaders: { + [header: string]: string, + }; + transports?: string[] | undefined; + clientName?: string; + admin: boolean; +}; + function once(socket: IOClientSocket, event: 'open' | 'message') { return new Promise((resolve, reject) => { const err = (e: any) => { @@ -70,6 +77,7 @@ export interface ScryptedClientStatic extends ScryptedStatic { connectionType: ScryptedClientConnectionType; rpcPeer: RpcPeer; loginResult: ScryptedClientLoginResult; + fork(options: ForkOptions & { worker: Worker }): PluginFork; } export interface ScryptedConnectionOptions { @@ -428,11 +436,10 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro const eioPath = `endpoint/${pluginId}/engine.io/api`; const eioEndpoint = baseUrl ? new URL(eioPath, baseUrl).pathname : '/' + eioPath; // https://github.com/socketio/engine.io/issues/690 - const cacheBust = Math.random().toString(36).substring(3, 10); - const eioOptions: Partial = { + const eioOptions: eio.SocketOptions = { path: eioEndpoint, query: { - cacheBust, + cacheBust: Math.random().toString(36).substring(3, 10), }, withCredentials: true, extraHeaders, @@ -470,7 +477,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro tryLocalAddressess: tryAddresses, }); - const localEioOptions: Partial = { + const localEioOptions: eio.SocketOptions = { ...eioOptions, extraHeaders: { ...eioOptions.extraHeaders, @@ -586,6 +593,8 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro endpointManager, mediaManager, clusterManager, + pluginHostAPI, + pluginRemoteAPI, } = scrypted; console.log('api attached', Date.now() - start); @@ -612,201 +621,131 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro .map(id => systemManager.getDeviceById(id)) .find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`); - const clusterPeers = new Map>(); - const finalizationRegistry = new FinalizationRegistry((clusterPeer: RpcPeer) => { - clusterPeer.kill('object finalized'); - }); - const ensureClusterPeer = (clusterObject: ClusterObject, connectRPCObjectOptions?: ConnectRPCObjectOptions) => { - // If dedicatedTransport is true, don't reuse existing cluster peers - if (!connectRPCObjectOptions?.dedicatedTransport) { - let clusterPeerPromise = clusterPeers.get(clusterObject.port); - if (clusterPeerPromise) - return clusterPeerPromise; - } + const connectRPCObject = clusterSetup(address, connectionType, queryToken, extraHeaders, options?.transports, sourcePeerId, clientName); - const clusterPeerPromise = (async () => { - const eioPath = 'engine.io/connectRPCObject'; - const eioEndpoint = new URL(eioPath, address).pathname; - const eioQueryToken = connectionType === 'http' ? undefined : queryToken; - const clusterPeerOptions = { - path: eioEndpoint, - query: { - cacheBust, - clusterObject: JSON.stringify(clusterObject), - ...eioQueryToken, - }, - withCredentials: true, - extraHeaders, - rejectUnauthorized: false, - transports: options?.transports, - }; + const loginResult: ScryptedClientLoginResult = { + username, + token, + directAddress, + localAddresses, + externalAddresses, + scryptedCloud, + queryToken, + authorization, + cloudAddress, + hostname, + serverId, + }; - const clusterPeerSocket = new eio.Socket(address, clusterPeerOptions); - let peerReady = false; + type ForkType = ScryptedClientStatic['fork']; + const fork: ForkType = (forkOptions) => { + const { worker } = forkOptions; - // Timeout handling for dedicated transports - let receiveTimeout: NodeJS.Timeout | undefined; - let sendTimeout: NodeJS.Timeout | undefined; - let clusterPeer: RpcPeer | undefined; + const serializer = createRpcSerializer({ + sendMessageBuffer: buffer => worker.postMessage(buffer), + sendMessageFinish: message => worker.postMessage(JSON.stringify(message)), + }); - const clearTimers = () => { - if (receiveTimeout) { - clearTimeout(receiveTimeout); - receiveTimeout = undefined; - } - if (sendTimeout) { - clearTimeout(sendTimeout); - sendTimeout = undefined; - } - }; + const threadPeer = new RpcPeer("main-client", 'thread', (message, reject, serializationContext) => { + try { + serializer.sendMessage(message, reject, serializationContext); + } + catch (e) { + reject?.(e as Error); + } + }); - const resetReceiveTimeout = connectRPCObjectOptions?.dedicatedTransport?.receiveTimeout ? () => { - if (receiveTimeout) { - clearTimeout(receiveTimeout); - } - receiveTimeout = setTimeout(() => { - if (clusterPeer) { - clusterPeer.kill('receive timeout'); + rpcPeer.killed.finally(() => threadPeer.kill('main rpc peer killed')); + + worker.addEventListener('message', async event => { + if (event.data instanceof Uint8Array) { + serializer.onMessageBuffer(Buffer.from(event.data)); + } + else { + serializer.onMessageFinish(JSON.parse(event.data)); + } + }); + + serializer.setupRpcPeer(threadPeer); + + // there is no worker close event? + const forkApi = new PluginAPIProxy(pluginHostAPI, mediaManager); + threadPeer.killed.finally(() => { + forkApi.removeListeners(); + worker.terminate(); + }); + + const internalFork: InternalFork = { + loginResult, + username, + address, + connectionType, + extraHeaders, + transports: options?.transports, + clientName, + admin, + }; + + threadPeer.params['client'] = internalFork; + + const result = (async () => { + const getRemote = await threadPeer.getParam('getRemote'); + const remote = await getRemote(forkApi, pluginId, { + serverVersion + }) as PluginRemote; + + await remote.setSystemState(systemManager.getSystemState()); + forkApi.listen((id, eventDetails, eventData) => { + // ScryptedDevice events will be handled specially and repropagated by the remote. + if (eventDetails.eventInterface === ScryptedInterface.ScryptedDevice) { + if (eventDetails.property === ScryptedInterfaceProperty.id) { + // a change on the id property means device was deleted + remote.updateDeviceState(eventData, undefined); } - }, connectRPCObjectOptions.dedicatedTransport.receiveTimeout); - } : undefined; - - const resetSendTimeout = connectRPCObjectOptions?.dedicatedTransport?.sendTimeout ? () => { - if (sendTimeout) { - clearTimeout(sendTimeout); - } - sendTimeout = setTimeout(() => { - if (clusterPeer) { - clusterPeer.kill('send timeout'); + else { + // a change on anything else is a descriptor update + remote.updateDeviceState(id, systemManager.getSystemState()[id]); } - }, connectRPCObjectOptions.dedicatedTransport.sendTimeout); - } : undefined; - - clusterPeerSocket.on('close', () => { - clusterPeer?.kill('socket closed'); - // Only remove from clusterPeers if it's not a dedicated transport - if (!connectRPCObjectOptions?.dedicatedTransport) { - clusterPeers.delete(clusterObject.port); + return; } - if (!peerReady) { - throw new Error("peer disconnected before setup completed"); + + if (eventDetails.property && !eventDetails.mixinId) { + remote.notify(id, eventDetails, systemManager.getSystemState()[id]?.[eventDetails.property]).catch(() => { }); + } + else { + remote.notify(id, eventDetails, eventData).catch(() => { }); } }); - try { - await once(clusterPeerSocket, 'open'); - - const serializer = createRpcDuplexSerializer({ - write: data => { - resetSendTimeout?.(); - clusterPeerSocket.send(data); - }, - }); - - clusterPeerSocket.on('message', data => { - resetReceiveTimeout?.(); - serializer.onData(Buffer.from(data)); - }); - - clusterPeer = new RpcPeer(clientName || 'engine.io-client', "cluster-proxy", (message, reject, serializationContext) => { - try { - resetSendTimeout?.(); - serializer.sendMessage(message, reject, serializationContext); - } - catch (e) { - reject?.(e as Error); - } - }); - clusterPeer.killedSafe.finally(() => { - clearTimers(); - clusterPeerSocket.close(); - }); - serializer.setupRpcPeer(clusterPeer); - clusterPeer.tags.localPort = sourcePeerId; - peerReady = true; - - // Initialize timeouts if configured - resetReceiveTimeout?.(); - resetSendTimeout?.(); - - return clusterPeer; - } - catch (e) { - clearTimers(); - console.error('failure ipc connect', e); - clusterPeerSocket.close(); - throw e; - } + const fork = await threadPeer.getParam('fork'); + return fork; })(); - // Only store in clusterPeers if it's not a dedicated transport - if (!connectRPCObjectOptions?.dedicatedTransport) { - clusterPeers.set(clusterObject.port, clusterPeerPromise); - } + result.catch(() => { + threadPeer.kill('fork setup failed'); + worker.terminate(); + }); - return clusterPeerPromise; + return { + [Symbol.dispose]() { + worker.terminate(); + threadPeer.kill('disposed'); + }, + result, + worker: { + terminate() { + worker.terminate(); + }, + nativeWorker: worker, + } as any as ForkWorker, + }; }; - const resolveObject = async (proxyId: string, sourcePeerPort: number) => { - const sourcePeer = await clusterPeers.get(sourcePeerPort); - if (sourcePeer?.remoteWeakProxies) { - return Object.values(sourcePeer.remoteWeakProxies).find( - v => v.deref()?.__cluster?.proxyId == proxyId - )?.deref(); - } - return null; - } - - const connectRPCObject = async (value: any, options?: ConnectRPCObjectOptions) => { - const clusterObject: ClusterObject = value?.__cluster; - if (!clusterObject) { - return value; - } - - const { port, proxyId } = clusterObject; - - // check if object is already connected - const resolved = await resolveObject(proxyId, port); - if (resolved) { - return resolved; - } - - try { - const clusterPeerPromise = ensureClusterPeer(clusterObject, options); - const clusterPeer = await clusterPeerPromise; - const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject'); - try { - const newValue = await connectRPCObject(clusterObject); - if (!newValue) - throw new Error('ipc object not found?'); - - // If dedicatedTransport is true, register the object for cleanup - if (options?.dedicatedTransport) { - finalizationRegistry.register(newValue, clusterPeer); - } - - return newValue; - } - catch (e) { - // If we have a clusterPeer and this is a dedicated transport, kill the connection - // to prevent resource leaks when connectRPCObject fails - if (options?.dedicatedTransport) { - clusterPeer.kill('connectRPCObject failed'); - } - throw e; - } - } - catch (e) { - console.error('failure ipc', e); - return value; - } - } - const ret: ScryptedClientStatic = { userId: userDevice?.id, serverVersion, username, - pluginRemoteAPI: undefined, + pluginRemoteAPI, address, connectionType, admin, @@ -818,23 +757,11 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro disconnect() { rpcPeer.kill('disconnect requested'); }, - pluginHostAPI: undefined, + pluginHostAPI, rpcPeer, - loginResult: { - username, - token, - directAddress, - localAddresses, - externalAddresses, - scryptedCloud, - queryToken, - authorization, - cloudAddress, - hostname, - serverId, - }, + loginResult, connectRPCObject, - fork: undefined, + fork, connect: undefined, } @@ -854,3 +781,308 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro throw e; } } + +function clusterSetup(address: string, connectionType: ScryptedClientConnectionType, queryToken: any, extraHeaders: { [header: string]: string }, transports: string[] | undefined, sourcePeerId: string, clientName?: string) { + const clusterPeers = new Map>(); + const finalizationRegistry = new FinalizationRegistry((clusterPeer: RpcPeer) => { + clusterPeer.kill('object finalized'); + }); + const ensureClusterPeer = (clusterObject: ClusterObject, connectRPCObjectOptions?: ConnectRPCObjectOptions) => { + // If dedicatedTransport is true, don't reuse existing cluster peers + if (!connectRPCObjectOptions?.dedicatedTransport) { + let clusterPeerPromise = clusterPeers.get(clusterObject.port); + if (clusterPeerPromise) + return clusterPeerPromise; + } + + const clusterPeerPromise = (async () => { + const eioPath = 'engine.io/connectRPCObject'; + const eioEndpoint = new URL(eioPath, address).pathname; + const eioQueryToken = connectionType === 'http' ? undefined : queryToken; + const clusterPeerOptions: eio.SocketOptions = { + path: eioEndpoint, + query: { + cacheBust: Math.random().toString(36).substring(3, 10), + clusterObject: JSON.stringify(clusterObject), + ...eioQueryToken, + }, + withCredentials: true, + extraHeaders, + rejectUnauthorized: false, + transports, + }; + + const clusterPeerSocket = new eio.Socket(address, clusterPeerOptions); + let peerReady = false; + + // Timeout handling for dedicated transports + let receiveTimeout: NodeJS.Timeout | undefined; + let sendTimeout: NodeJS.Timeout | undefined; + let clusterPeer: RpcPeer | undefined; + + const clearTimers = () => { + if (receiveTimeout) { + clearTimeout(receiveTimeout); + receiveTimeout = undefined; + } + if (sendTimeout) { + clearTimeout(sendTimeout); + sendTimeout = undefined; + } + }; + + const resetReceiveTimeout = connectRPCObjectOptions?.dedicatedTransport?.receiveTimeout ? () => { + if (receiveTimeout) { + clearTimeout(receiveTimeout); + } + receiveTimeout = setTimeout(() => { + if (clusterPeer) { + clusterPeer.kill('receive timeout'); + } + }, connectRPCObjectOptions.dedicatedTransport.receiveTimeout); + } : undefined; + + const resetSendTimeout = connectRPCObjectOptions?.dedicatedTransport?.sendTimeout ? () => { + if (sendTimeout) { + clearTimeout(sendTimeout); + } + sendTimeout = setTimeout(() => { + if (clusterPeer) { + clusterPeer.kill('send timeout'); + } + }, connectRPCObjectOptions.dedicatedTransport.sendTimeout); + } : undefined; + + clusterPeerSocket.on('close', () => { + clusterPeer?.kill('socket closed'); + // Only remove from clusterPeers if it's not a dedicated transport + if (!connectRPCObjectOptions?.dedicatedTransport) { + clusterPeers.delete(clusterObject.port); + } + if (!peerReady) { + throw new Error("peer disconnected before setup completed"); + } + }); + + try { + await once(clusterPeerSocket, 'open'); + + const serializer = createRpcDuplexSerializer({ + write: data => { + resetSendTimeout?.(); + clusterPeerSocket.send(data); + }, + }); + + clusterPeerSocket.on('message', data => { + resetReceiveTimeout?.(); + serializer.onData(Buffer.from(data)); + }); + + clusterPeer = new RpcPeer(clientName || 'engine.io-client', "cluster-proxy", (message, reject, serializationContext) => { + try { + resetSendTimeout?.(); + serializer.sendMessage(message, reject, serializationContext); + } + catch (e) { + reject?.(e as Error); + } + }); + clusterPeer.killedSafe.finally(() => { + clearTimers(); + clusterPeerSocket.close(); + }); + serializer.setupRpcPeer(clusterPeer); + clusterPeer.tags.localPort = sourcePeerId; + peerReady = true; + + // Initialize timeouts if configured + resetReceiveTimeout?.(); + resetSendTimeout?.(); + + return clusterPeer; + } + catch (e) { + clearTimers(); + console.error('failure ipc connect', e); + clusterPeerSocket.close(); + throw e; + } + })(); + + // Only store in clusterPeers if it's not a dedicated transport + if (!connectRPCObjectOptions?.dedicatedTransport) { + clusterPeers.set(clusterObject.port, clusterPeerPromise); + } + + return clusterPeerPromise; + }; + + const resolveObject = async (proxyId: string, sourcePeerPort: number) => { + const sourcePeer = await clusterPeers.get(sourcePeerPort); + if (sourcePeer?.remoteWeakProxies) { + return Object.values(sourcePeer.remoteWeakProxies).find( + v => v.deref()?.__cluster?.proxyId == proxyId + )?.deref(); + } + return null; + } + + const connectRPCObject = async (value: any, options?: ConnectRPCObjectOptions) => { + const clusterObject: ClusterObject = value?.__cluster; + if (!clusterObject) { + return value; + } + + const { port, proxyId } = clusterObject; + + // check if object is already connected + const resolved = await resolveObject(proxyId, port); + if (resolved) { + return resolved; + } + + try { + const clusterPeerPromise = ensureClusterPeer(clusterObject, options); + const clusterPeer = await clusterPeerPromise; + const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject'); + try { + const newValue = await connectRPCObject(clusterObject); + if (!newValue) + throw new Error('ipc object not found?'); + + // If dedicatedTransport is true, register the object for cleanup + if (options?.dedicatedTransport) { + finalizationRegistry.register(newValue, clusterPeer); + } + + return newValue; + } + catch (e) { + // If we have a clusterPeer and this is a dedicated transport, kill the connection + // to prevent resource leaks when connectRPCObject fails + if (options?.dedicatedTransport) { + clusterPeer.kill('connectRPCObject failed'); + } + throw e; + } + } + catch (e) { + console.error('failure ipc', e); + return value; + } + } + + return connectRPCObject; +} + +export async function connectScryptedClientFork(forkMain: (client: ScryptedClientStatic) => Promise) { + const start = Date.now(); + + try { + + const serializer = createRpcSerializer({ + sendMessageBuffer: buffer => self.postMessage(buffer), + sendMessageFinish: message => self.postMessage(JSON.stringify(message)), + }); + + const rpcPeer = new RpcPeer('thread', "main-client", (message, reject, serializationContext) => { + try { + serializer.sendMessage(message, reject, serializationContext); + } + catch (e) { + reject?.(e as Error); + } + }); + + self.addEventListener('message', event => { + if (event.data instanceof Uint8Array) { + serializer.onMessageBuffer(Buffer.from(event.data)); + } + else { + serializer.onMessageFinish(JSON.parse(event.data)); + } + }); + + serializer.setupRpcPeer(rpcPeer); + + + const scrypted = await attachPluginRemote(rpcPeer, undefined); + const { + serverVersion, + systemManager, + deviceManager, + endpointManager, + mediaManager, + clusterManager, + pluginHostAPI, + pluginRemoteAPI, + } = scrypted; + console.log('api attached', Date.now() - start); + + mediaManager.createMediaObject = async(data: any, mimeType: string, options: T) => { + return new MediaObject(mimeType, data, options) as any; + } + console.log('api initialized', Date.now() - start); + + const { + loginResult, + username, + address, + connectionType, + extraHeaders, + transports, + clientName, + admin, + } = await rpcPeer.getParam('client') as InternalFork; + + const { queryToken } = loginResult; + + const userDevice = Object.keys(systemManager.getSystemState()) + .map(id => systemManager.getDeviceById(id)) + .find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`); + + const connectRPCObject = clusterSetup(address, connectionType, queryToken, extraHeaders, transports, sourcePeerId, clientName); + + type ForkType = ScryptedClientStatic['fork']; + const fork: ForkType = (forkOptions) => { + throw new Error('not implemented'); + }; + + const ret: ScryptedClientStatic = { + userId: userDevice?.id, + serverVersion, + username, + pluginRemoteAPI, + address, + connectionType, + admin, + systemManager, + clusterManager, + deviceManager, + endpointManager, + mediaManager, + disconnect() { + rpcPeer.kill('disconnect requested'); + }, + pluginHostAPI, + rpcPeer, + loginResult, + connectRPCObject, + fork, + connect: undefined, + } + rpcPeer.killed.finally(() => { + self.close(); + ret.onClose?.(); + }); + + const forked = await forkMain(ret); + + rpcPeer.params['fork'] = forked; + } + catch (e) { + self.close(); + throw e; + } +}