client: add worker/fork support to web client

This commit is contained in:
Koushik Dutta
2025-12-16 12:22:03 -08:00
parent 3b0042c922
commit ebe6bcc58f

View File

@@ -1,19 +1,17 @@
import { ConnectRPCObjectOptions, MediaObjectCreateOptions, ScryptedStatic } from "@scrypted/types";
import { ConnectRPCObjectOptions, ForkOptions, ForkWorker, MediaObjectCreateOptions, PluginFork, ScryptedInterface, ScryptedInterfaceProperty, ScryptedStatic } from "@scrypted/types";
import * as eio from 'engine.io-client';
import { SocketOptions } from 'engine.io-client';
import { timeoutPromise } from "../../../common/src/promise-utils";
import type { ClusterObject, ConnectRPCObject } from '../../../server/src/cluster/connect-rpc-object';
import { domFetch } from "../../../server/src/fetch";
import { httpFetch } from '../../../server/src/fetch/http-fetch';
import type { IOSocket } from '../../../server/src/io';
import { MediaObject } from '../../../server/src/plugin/mediaobject';
import { PluginAPIProxy, PluginRemote } from "../../../server/src/plugin/plugin-api";
import { attachPluginRemote } from '../../../server/src/plugin/plugin-remote';
import { RpcPeer } from '../../../server/src/rpc';
import { createRpcDuplexSerializer, createRpcSerializer } from '../../../server/src/rpc-serializer';
import packageJson from '../package.json';
import { isIPAddress } from "./ip";
import { domFetch } from "../../../server/src/fetch";
import { httpFetch } from '../../../server/src/fetch/http-fetch';
export * as rpc from '../../../server/src/rpc';
export * as rpc_serializer from '../../../server/src/rpc-serializer';
@@ -33,6 +31,15 @@ const sourcePeerId = RpcPeer.generateId();
type IOClientSocket = eio.Socket & IOSocket;
interface InternalFork extends Pick<ScryptedClientStatic, 'loginResult' | 'username' | 'address' | 'connectionType'> {
extraHeaders: {
[header: string]: string,
};
transports?: string[] | undefined;
clientName?: string;
admin: boolean;
};
function once(socket: IOClientSocket, event: 'open' | 'message') {
return new Promise<any[]>((resolve, reject) => {
const err = (e: any) => {
@@ -70,6 +77,7 @@ export interface ScryptedClientStatic extends ScryptedStatic {
connectionType: ScryptedClientConnectionType;
rpcPeer: RpcPeer;
loginResult: ScryptedClientLoginResult;
fork<T>(options: ForkOptions & { worker: Worker }): PluginFork<T>;
}
export interface ScryptedConnectionOptions {
@@ -428,11 +436,10 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
const eioPath = `endpoint/${pluginId}/engine.io/api`;
const eioEndpoint = baseUrl ? new URL(eioPath, baseUrl).pathname : '/' + eioPath;
// https://github.com/socketio/engine.io/issues/690
const cacheBust = Math.random().toString(36).substring(3, 10);
const eioOptions: Partial<SocketOptions> = {
const eioOptions: eio.SocketOptions = {
path: eioEndpoint,
query: {
cacheBust,
cacheBust: Math.random().toString(36).substring(3, 10),
},
withCredentials: true,
extraHeaders,
@@ -470,7 +477,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
tryLocalAddressess: tryAddresses,
});
const localEioOptions: Partial<SocketOptions> = {
const localEioOptions: eio.SocketOptions = {
...eioOptions,
extraHeaders: {
...eioOptions.extraHeaders,
@@ -586,6 +593,8 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
endpointManager,
mediaManager,
clusterManager,
pluginHostAPI,
pluginRemoteAPI,
} = scrypted;
console.log('api attached', Date.now() - start);
@@ -612,201 +621,131 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
.map(id => systemManager.getDeviceById(id))
.find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`);
const clusterPeers = new Map<number, Promise<RpcPeer>>();
const finalizationRegistry = new FinalizationRegistry((clusterPeer: RpcPeer) => {
clusterPeer.kill('object finalized');
});
const ensureClusterPeer = (clusterObject: ClusterObject, connectRPCObjectOptions?: ConnectRPCObjectOptions) => {
// If dedicatedTransport is true, don't reuse existing cluster peers
if (!connectRPCObjectOptions?.dedicatedTransport) {
let clusterPeerPromise = clusterPeers.get(clusterObject.port);
if (clusterPeerPromise)
return clusterPeerPromise;
}
const connectRPCObject = clusterSetup(address, connectionType, queryToken, extraHeaders, options?.transports, sourcePeerId, clientName);
const clusterPeerPromise = (async () => {
const eioPath = 'engine.io/connectRPCObject';
const eioEndpoint = new URL(eioPath, address).pathname;
const eioQueryToken = connectionType === 'http' ? undefined : queryToken;
const clusterPeerOptions = {
path: eioEndpoint,
query: {
cacheBust,
clusterObject: JSON.stringify(clusterObject),
...eioQueryToken,
},
withCredentials: true,
extraHeaders,
rejectUnauthorized: false,
transports: options?.transports,
};
const loginResult: ScryptedClientLoginResult = {
username,
token,
directAddress,
localAddresses,
externalAddresses,
scryptedCloud,
queryToken,
authorization,
cloudAddress,
hostname,
serverId,
};
const clusterPeerSocket = new eio.Socket(address, clusterPeerOptions);
let peerReady = false;
type ForkType = ScryptedClientStatic['fork'];
const fork: ForkType = (forkOptions) => {
const { worker } = forkOptions;
// Timeout handling for dedicated transports
let receiveTimeout: NodeJS.Timeout | undefined;
let sendTimeout: NodeJS.Timeout | undefined;
let clusterPeer: RpcPeer | undefined;
const serializer = createRpcSerializer({
sendMessageBuffer: buffer => worker.postMessage(buffer),
sendMessageFinish: message => worker.postMessage(JSON.stringify(message)),
});
const clearTimers = () => {
if (receiveTimeout) {
clearTimeout(receiveTimeout);
receiveTimeout = undefined;
}
if (sendTimeout) {
clearTimeout(sendTimeout);
sendTimeout = undefined;
}
};
const threadPeer = new RpcPeer("main-client", 'thread', (message, reject, serializationContext) => {
try {
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e as Error);
}
});
const resetReceiveTimeout = connectRPCObjectOptions?.dedicatedTransport?.receiveTimeout ? () => {
if (receiveTimeout) {
clearTimeout(receiveTimeout);
}
receiveTimeout = setTimeout(() => {
if (clusterPeer) {
clusterPeer.kill('receive timeout');
rpcPeer.killed.finally(() => threadPeer.kill('main rpc peer killed'));
worker.addEventListener('message', async event => {
if (event.data instanceof Uint8Array) {
serializer.onMessageBuffer(Buffer.from(event.data));
}
else {
serializer.onMessageFinish(JSON.parse(event.data));
}
});
serializer.setupRpcPeer(threadPeer);
// there is no worker close event?
const forkApi = new PluginAPIProxy(pluginHostAPI, mediaManager);
threadPeer.killed.finally(() => {
forkApi.removeListeners();
worker.terminate();
});
const internalFork: InternalFork = {
loginResult,
username,
address,
connectionType,
extraHeaders,
transports: options?.transports,
clientName,
admin,
};
threadPeer.params['client'] = internalFork;
const result = (async () => {
const getRemote = await threadPeer.getParam('getRemote');
const remote = await getRemote(forkApi, pluginId, {
serverVersion
}) as PluginRemote;
await remote.setSystemState(systemManager.getSystemState());
forkApi.listen((id, eventDetails, eventData) => {
// ScryptedDevice events will be handled specially and repropagated by the remote.
if (eventDetails.eventInterface === ScryptedInterface.ScryptedDevice) {
if (eventDetails.property === ScryptedInterfaceProperty.id) {
// a change on the id property means device was deleted
remote.updateDeviceState(eventData, undefined);
}
}, connectRPCObjectOptions.dedicatedTransport.receiveTimeout);
} : undefined;
const resetSendTimeout = connectRPCObjectOptions?.dedicatedTransport?.sendTimeout ? () => {
if (sendTimeout) {
clearTimeout(sendTimeout);
}
sendTimeout = setTimeout(() => {
if (clusterPeer) {
clusterPeer.kill('send timeout');
else {
// a change on anything else is a descriptor update
remote.updateDeviceState(id, systemManager.getSystemState()[id]);
}
}, connectRPCObjectOptions.dedicatedTransport.sendTimeout);
} : undefined;
clusterPeerSocket.on('close', () => {
clusterPeer?.kill('socket closed');
// Only remove from clusterPeers if it's not a dedicated transport
if (!connectRPCObjectOptions?.dedicatedTransport) {
clusterPeers.delete(clusterObject.port);
return;
}
if (!peerReady) {
throw new Error("peer disconnected before setup completed");
if (eventDetails.property && !eventDetails.mixinId) {
remote.notify(id, eventDetails, systemManager.getSystemState()[id]?.[eventDetails.property]).catch(() => { });
}
else {
remote.notify(id, eventDetails, eventData).catch(() => { });
}
});
try {
await once(clusterPeerSocket, 'open');
const serializer = createRpcDuplexSerializer({
write: data => {
resetSendTimeout?.();
clusterPeerSocket.send(data);
},
});
clusterPeerSocket.on('message', data => {
resetReceiveTimeout?.();
serializer.onData(Buffer.from(data));
});
clusterPeer = new RpcPeer(clientName || 'engine.io-client', "cluster-proxy", (message, reject, serializationContext) => {
try {
resetSendTimeout?.();
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e as Error);
}
});
clusterPeer.killedSafe.finally(() => {
clearTimers();
clusterPeerSocket.close();
});
serializer.setupRpcPeer(clusterPeer);
clusterPeer.tags.localPort = sourcePeerId;
peerReady = true;
// Initialize timeouts if configured
resetReceiveTimeout?.();
resetSendTimeout?.();
return clusterPeer;
}
catch (e) {
clearTimers();
console.error('failure ipc connect', e);
clusterPeerSocket.close();
throw e;
}
const fork = await threadPeer.getParam('fork');
return fork;
})();
// Only store in clusterPeers if it's not a dedicated transport
if (!connectRPCObjectOptions?.dedicatedTransport) {
clusterPeers.set(clusterObject.port, clusterPeerPromise);
}
result.catch(() => {
threadPeer.kill('fork setup failed');
worker.terminate();
});
return clusterPeerPromise;
return {
[Symbol.dispose]() {
worker.terminate();
threadPeer.kill('disposed');
},
result,
worker: {
terminate() {
worker.terminate();
},
nativeWorker: worker,
} as any as ForkWorker,
};
};
const resolveObject = async (proxyId: string, sourcePeerPort: number) => {
const sourcePeer = await clusterPeers.get(sourcePeerPort);
if (sourcePeer?.remoteWeakProxies) {
return Object.values(sourcePeer.remoteWeakProxies).find(
v => v.deref()?.__cluster?.proxyId == proxyId
)?.deref();
}
return null;
}
const connectRPCObject = async (value: any, options?: ConnectRPCObjectOptions) => {
const clusterObject: ClusterObject = value?.__cluster;
if (!clusterObject) {
return value;
}
const { port, proxyId } = clusterObject;
// check if object is already connected
const resolved = await resolveObject(proxyId, port);
if (resolved) {
return resolved;
}
try {
const clusterPeerPromise = ensureClusterPeer(clusterObject, options);
const clusterPeer = await clusterPeerPromise;
const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject');
try {
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
// If dedicatedTransport is true, register the object for cleanup
if (options?.dedicatedTransport) {
finalizationRegistry.register(newValue, clusterPeer);
}
return newValue;
}
catch (e) {
// If we have a clusterPeer and this is a dedicated transport, kill the connection
// to prevent resource leaks when connectRPCObject fails
if (options?.dedicatedTransport) {
clusterPeer.kill('connectRPCObject failed');
}
throw e;
}
}
catch (e) {
console.error('failure ipc', e);
return value;
}
}
const ret: ScryptedClientStatic = {
userId: userDevice?.id,
serverVersion,
username,
pluginRemoteAPI: undefined,
pluginRemoteAPI,
address,
connectionType,
admin,
@@ -818,23 +757,11 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
disconnect() {
rpcPeer.kill('disconnect requested');
},
pluginHostAPI: undefined,
pluginHostAPI,
rpcPeer,
loginResult: {
username,
token,
directAddress,
localAddresses,
externalAddresses,
scryptedCloud,
queryToken,
authorization,
cloudAddress,
hostname,
serverId,
},
loginResult,
connectRPCObject,
fork: undefined,
fork,
connect: undefined,
}
@@ -854,3 +781,308 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
throw e;
}
}
function clusterSetup(address: string, connectionType: ScryptedClientConnectionType, queryToken: any, extraHeaders: { [header: string]: string }, transports: string[] | undefined, sourcePeerId: string, clientName?: string) {
const clusterPeers = new Map<number, Promise<RpcPeer>>();
const finalizationRegistry = new FinalizationRegistry((clusterPeer: RpcPeer) => {
clusterPeer.kill('object finalized');
});
const ensureClusterPeer = (clusterObject: ClusterObject, connectRPCObjectOptions?: ConnectRPCObjectOptions) => {
// If dedicatedTransport is true, don't reuse existing cluster peers
if (!connectRPCObjectOptions?.dedicatedTransport) {
let clusterPeerPromise = clusterPeers.get(clusterObject.port);
if (clusterPeerPromise)
return clusterPeerPromise;
}
const clusterPeerPromise = (async () => {
const eioPath = 'engine.io/connectRPCObject';
const eioEndpoint = new URL(eioPath, address).pathname;
const eioQueryToken = connectionType === 'http' ? undefined : queryToken;
const clusterPeerOptions: eio.SocketOptions = {
path: eioEndpoint,
query: {
cacheBust: Math.random().toString(36).substring(3, 10),
clusterObject: JSON.stringify(clusterObject),
...eioQueryToken,
},
withCredentials: true,
extraHeaders,
rejectUnauthorized: false,
transports,
};
const clusterPeerSocket = new eio.Socket(address, clusterPeerOptions);
let peerReady = false;
// Timeout handling for dedicated transports
let receiveTimeout: NodeJS.Timeout | undefined;
let sendTimeout: NodeJS.Timeout | undefined;
let clusterPeer: RpcPeer | undefined;
const clearTimers = () => {
if (receiveTimeout) {
clearTimeout(receiveTimeout);
receiveTimeout = undefined;
}
if (sendTimeout) {
clearTimeout(sendTimeout);
sendTimeout = undefined;
}
};
const resetReceiveTimeout = connectRPCObjectOptions?.dedicatedTransport?.receiveTimeout ? () => {
if (receiveTimeout) {
clearTimeout(receiveTimeout);
}
receiveTimeout = setTimeout(() => {
if (clusterPeer) {
clusterPeer.kill('receive timeout');
}
}, connectRPCObjectOptions.dedicatedTransport.receiveTimeout);
} : undefined;
const resetSendTimeout = connectRPCObjectOptions?.dedicatedTransport?.sendTimeout ? () => {
if (sendTimeout) {
clearTimeout(sendTimeout);
}
sendTimeout = setTimeout(() => {
if (clusterPeer) {
clusterPeer.kill('send timeout');
}
}, connectRPCObjectOptions.dedicatedTransport.sendTimeout);
} : undefined;
clusterPeerSocket.on('close', () => {
clusterPeer?.kill('socket closed');
// Only remove from clusterPeers if it's not a dedicated transport
if (!connectRPCObjectOptions?.dedicatedTransport) {
clusterPeers.delete(clusterObject.port);
}
if (!peerReady) {
throw new Error("peer disconnected before setup completed");
}
});
try {
await once(clusterPeerSocket, 'open');
const serializer = createRpcDuplexSerializer({
write: data => {
resetSendTimeout?.();
clusterPeerSocket.send(data);
},
});
clusterPeerSocket.on('message', data => {
resetReceiveTimeout?.();
serializer.onData(Buffer.from(data));
});
clusterPeer = new RpcPeer(clientName || 'engine.io-client', "cluster-proxy", (message, reject, serializationContext) => {
try {
resetSendTimeout?.();
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e as Error);
}
});
clusterPeer.killedSafe.finally(() => {
clearTimers();
clusterPeerSocket.close();
});
serializer.setupRpcPeer(clusterPeer);
clusterPeer.tags.localPort = sourcePeerId;
peerReady = true;
// Initialize timeouts if configured
resetReceiveTimeout?.();
resetSendTimeout?.();
return clusterPeer;
}
catch (e) {
clearTimers();
console.error('failure ipc connect', e);
clusterPeerSocket.close();
throw e;
}
})();
// Only store in clusterPeers if it's not a dedicated transport
if (!connectRPCObjectOptions?.dedicatedTransport) {
clusterPeers.set(clusterObject.port, clusterPeerPromise);
}
return clusterPeerPromise;
};
const resolveObject = async (proxyId: string, sourcePeerPort: number) => {
const sourcePeer = await clusterPeers.get(sourcePeerPort);
if (sourcePeer?.remoteWeakProxies) {
return Object.values(sourcePeer.remoteWeakProxies).find(
v => v.deref()?.__cluster?.proxyId == proxyId
)?.deref();
}
return null;
}
const connectRPCObject = async (value: any, options?: ConnectRPCObjectOptions) => {
const clusterObject: ClusterObject = value?.__cluster;
if (!clusterObject) {
return value;
}
const { port, proxyId } = clusterObject;
// check if object is already connected
const resolved = await resolveObject(proxyId, port);
if (resolved) {
return resolved;
}
try {
const clusterPeerPromise = ensureClusterPeer(clusterObject, options);
const clusterPeer = await clusterPeerPromise;
const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject');
try {
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
// If dedicatedTransport is true, register the object for cleanup
if (options?.dedicatedTransport) {
finalizationRegistry.register(newValue, clusterPeer);
}
return newValue;
}
catch (e) {
// If we have a clusterPeer and this is a dedicated transport, kill the connection
// to prevent resource leaks when connectRPCObject fails
if (options?.dedicatedTransport) {
clusterPeer.kill('connectRPCObject failed');
}
throw e;
}
}
catch (e) {
console.error('failure ipc', e);
return value;
}
}
return connectRPCObject;
}
export async function connectScryptedClientFork(forkMain: (client: ScryptedClientStatic) => Promise<any>) {
const start = Date.now();
try {
const serializer = createRpcSerializer({
sendMessageBuffer: buffer => self.postMessage(buffer),
sendMessageFinish: message => self.postMessage(JSON.stringify(message)),
});
const rpcPeer = new RpcPeer('thread', "main-client", (message, reject, serializationContext) => {
try {
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e as Error);
}
});
self.addEventListener('message', event => {
if (event.data instanceof Uint8Array) {
serializer.onMessageBuffer(Buffer.from(event.data));
}
else {
serializer.onMessageFinish(JSON.parse(event.data));
}
});
serializer.setupRpcPeer(rpcPeer);
const scrypted = await attachPluginRemote(rpcPeer, undefined);
const {
serverVersion,
systemManager,
deviceManager,
endpointManager,
mediaManager,
clusterManager,
pluginHostAPI,
pluginRemoteAPI,
} = scrypted;
console.log('api attached', Date.now() - start);
mediaManager.createMediaObject = async<T extends MediaObjectCreateOptions>(data: any, mimeType: string, options: T) => {
return new MediaObject(mimeType, data, options) as any;
}
console.log('api initialized', Date.now() - start);
const {
loginResult,
username,
address,
connectionType,
extraHeaders,
transports,
clientName,
admin,
} = await rpcPeer.getParam('client') as InternalFork;
const { queryToken } = loginResult;
const userDevice = Object.keys(systemManager.getSystemState())
.map(id => systemManager.getDeviceById(id))
.find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`);
const connectRPCObject = clusterSetup(address, connectionType, queryToken, extraHeaders, transports, sourcePeerId, clientName);
type ForkType = ScryptedClientStatic['fork'];
const fork: ForkType = (forkOptions) => {
throw new Error('not implemented');
};
const ret: ScryptedClientStatic = {
userId: userDevice?.id,
serverVersion,
username,
pluginRemoteAPI,
address,
connectionType,
admin,
systemManager,
clusterManager,
deviceManager,
endpointManager,
mediaManager,
disconnect() {
rpcPeer.kill('disconnect requested');
},
pluginHostAPI,
rpcPeer,
loginResult,
connectRPCObject,
fork,
connect: undefined,
}
rpcPeer.killed.finally(() => {
self.close();
ret.onClose?.();
});
const forked = await forkMain(ret);
rpcPeer.params['fork'] = forked;
}
catch (e) {
self.close();
throw e;
}
}