sdk/client: add optional dedicated connections and lifetime to connectRPCObject

This commit is contained in:
Koushik Dutta
2025-09-17 08:31:44 -07:00
parent ca855bb9a6
commit eaa6da005b
9 changed files with 220 additions and 176 deletions

View File

@@ -1,4 +1,4 @@
import { MediaObjectCreateOptions, ScryptedStatic } from "@scrypted/types";
import { MediaObjectCreateOptions, ScryptedStatic, ConnectRPCObjectOptions } from "@scrypted/types";
import * as eio from 'engine.io-client';
import { SocketOptions } from 'engine.io-client';
import { timeoutPromise } from "../../../common/src/promise-utils";
@@ -573,62 +573,77 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
.find(device => device.pluginId === '@scrypted/core' && device.nativeId === `user:${username}`);
const clusterPeers = new Map<number, Promise<RpcPeer>>();
const ensureClusterPeer = (clusterObject: ClusterObject) => {
let clusterPeerPromise = clusterPeers.get(clusterObject.port);
if (!clusterPeerPromise) {
clusterPeerPromise = (async () => {
const eioPath = 'engine.io/connectRPCObject';
const eioEndpoint = baseUrl ? new URL(eioPath, baseUrl).pathname : '/' + eioPath;
const clusterPeerOptions = {
path: eioEndpoint,
query: {
cacehBust,
clusterObject: JSON.stringify(clusterObject),
},
withCredentials: true,
extraHeaders,
rejectUnauthorized: false,
transports: options?.transports,
};
const finalizationRegistry = new FinalizationRegistry((clusterPeer: RpcPeer) => {
clusterPeer.kill('object finalized');
});
const ensureClusterPeer = (clusterObject: ClusterObject, connectRPCObjectOptions?: ConnectRPCObjectOptions) => {
// If dedicatedTransport is true, don't reuse existing cluster peers
if (!connectRPCObjectOptions?.dedicatedTransport) {
let clusterPeerPromise = clusterPeers.get(clusterObject.port);
if (clusterPeerPromise)
return clusterPeerPromise;
}
const clusterPeerSocket = new eio.Socket(explicitBaseUrl, clusterPeerOptions);
let peerReady = false;
clusterPeerSocket.on('close', () => {
const clusterPeerPromise = (async () => {
const eioPath = 'engine.io/connectRPCObject';
const eioEndpoint = baseUrl ? new URL(eioPath, baseUrl).pathname : '/' + eioPath;
const clusterPeerOptions = {
path: eioEndpoint,
query: {
cacehBust,
clusterObject: JSON.stringify(clusterObject),
},
withCredentials: true,
extraHeaders,
rejectUnauthorized: false,
transports: options?.transports,
};
const clusterPeerSocket = new eio.Socket(explicitBaseUrl, clusterPeerOptions);
let peerReady = false;
clusterPeerSocket.on('close', () => {
// Only remove from clusterPeers if it's not a dedicated transport
if (!connectRPCObjectOptions?.dedicatedTransport) {
clusterPeers.delete(clusterObject.port);
if (!peerReady) {
throw new Error("peer disconnected before setup completed");
}
if (!peerReady) {
throw new Error("peer disconnected before setup completed");
}
});
try {
await once(clusterPeerSocket, 'open');
const serializer = createRpcDuplexSerializer({
write: data => clusterPeerSocket.send(data),
});
clusterPeerSocket.on('message', data => serializer.onData(Buffer.from(data)));
const clusterPeer = new RpcPeer(clientName || 'engine.io-client', "cluster-proxy", (message, reject, serializationContext) => {
try {
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e as Error);
}
});
serializer.setupRpcPeer(clusterPeer);
clusterPeer.tags.localPort = sourcePeerId;
peerReady = true;
return clusterPeer;
}
catch (e) {
console.error('failure ipc connect', e);
clusterPeerSocket.close();
throw e;
}
})();
try {
await once(clusterPeerSocket, 'open');
const serializer = createRpcDuplexSerializer({
write: data => clusterPeerSocket.send(data),
});
clusterPeerSocket.on('message', data => serializer.onData(Buffer.from(data)));
const clusterPeer = new RpcPeer(clientName || 'engine.io-client', "cluster-proxy", (message, reject, serializationContext) => {
try {
serializer.sendMessage(message, reject, serializationContext);
}
catch (e) {
reject?.(e as Error);
}
});
serializer.setupRpcPeer(clusterPeer);
clusterPeer.tags.localPort = sourcePeerId;
peerReady = true;
return clusterPeer;
}
catch (e) {
console.error('failure ipc connect', e);
clusterPeerSocket.close();
throw e;
}
})();
// Only store in clusterPeers if it's not a dedicated transport
if (!connectRPCObjectOptions?.dedicatedTransport) {
clusterPeers.set(clusterObject.port, clusterPeerPromise);
}
return clusterPeerPromise;
};
@@ -642,7 +657,7 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
return null;
}
const connectRPCObject = async (value: any) => {
const connectRPCObject = async (value: any, options?: ConnectRPCObjectOptions) => {
const clusterObject: ClusterObject = value?.__cluster;
if (!clusterObject) {
return value;
@@ -657,13 +672,29 @@ export async function connectScryptedClient(options: ScryptedClientOptions): Pro
}
try {
const clusterPeerPromise = ensureClusterPeer(clusterObject);
const clusterPeerPromise = ensureClusterPeer(clusterObject, options);
const clusterPeer = await clusterPeerPromise;
const connectRPCObject: ConnectRPCObject = await clusterPeer.getParam('connectRPCObject');
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
return newValue;
try {
const newValue = await connectRPCObject(clusterObject);
if (!newValue)
throw new Error('ipc object not found?');
// If dedicatedTransport is true, register the object for cleanup
if (options?.dedicatedTransport) {
finalizationRegistry.register(newValue, clusterPeer);
}
return newValue;
}
catch (e) {
// If we have a clusterPeer and this is a dedicated transport, kill the connection
// to prevent resource leaks when connectRPCObject fails
if (options?.dedicatedTransport) {
clusterPeer.kill('connectRPCObject failed');
}
throw e;
}
}
catch (e) {
console.error('failure ipc', e);