From 61d9345bf64f9d15bead61a27fa118d768bdded6 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Thu, 2 Apr 2026 14:50:26 -0700 Subject: [PATCH] cluster: add type assertions for strictNullChecks compliance Fix strictNullChecks in cluster modules: - cluster-labels.ts: add assertions for label and weight access - cluster.ts: keep getClusterWorkerId returning string (API contract) - scrypted-cluster-main.ts: add assertions for socket properties, consolidate remoteAddress assertion, fix cluster worker lookups - cluster-fork.ts: add assertions for worker and options properties, fix findPluginDevice call with assertion --- server/src/cluster/cluster-labels.ts | 4 +-- server/src/plugin/cluster.ts | 2 +- server/src/scrypted-cluster-main.ts | 37 ++++++++++++++-------------- server/src/services/cluster-fork.ts | 20 +++++++-------- 4 files changed, 32 insertions(+), 31 deletions(-) diff --git a/server/src/cluster/cluster-labels.ts b/server/src/cluster/cluster-labels.ts index 4d2712a51..3c309c041 100644 --- a/server/src/cluster/cluster-labels.ts +++ b/server/src/cluster/cluster-labels.ts @@ -45,13 +45,13 @@ export function getClusterWorkerWeight() { return parseFloat(process.env.SCRYPTED_CLUSTER_WEIGHT || '') || 1; } -export function needsClusterForkWorker(options: ClusterForkOptions) { +export function needsClusterForkWorker(options?: ClusterForkOptions) { return process.env.SCRYPTED_CLUSTER_ADDRESS && options && (!matchesClusterLabels(options, getClusterLabels()) || options.clusterWorkerId); } -export function utilizesClusterForkWorker(options: ClusterForkOptions) { +export function utilizesClusterForkWorker(options?: ClusterForkOptions) { return process.env.SCRYPTED_CLUSTER_ADDRESS && (options?.labels || options?.clusterWorkerId); } diff --git a/server/src/plugin/cluster.ts b/server/src/plugin/cluster.ts index 9af67f07c..51cac49bd 100644 --- a/server/src/plugin/cluster.ts +++ b/server/src/plugin/cluster.ts @@ -6,7 +6,7 @@ export interface ClusterForkServiceInterface { } export class ClusterManagerImpl implements ClusterManager { - private clusterServicePromise: Promise; + private clusterServicePromise!: Promise; constructor(public clusterMode: undefined | 'client' | 'server', private api: PluginAPI, private clusterWorkerId: string) { } diff --git a/server/src/scrypted-cluster-main.ts b/server/src/scrypted-cluster-main.ts index 229d334fa..801d2b706 100644 --- a/server/src/scrypted-cluster-main.ts +++ b/server/src/scrypted-cluster-main.ts @@ -152,7 +152,7 @@ function createClusterForkParam(mainFilename: string, clusterId: string, cluster SCRYPTED_FFMPEG_PATH: process.env.SCRYPTED_FFMPEG_PATH || await getScryptedFFmpegPath(), }; - runtimeWorker = rt(mainFilename, runtimeWorkerOptions, undefined); + runtimeWorker = rt(mainFilename, runtimeWorkerOptions, undefined!); runtimeWorker.stdout.on('data', data => console.log(data.toString())); runtimeWorker.stderr.on('data', data => console.error(data.toString())); @@ -224,8 +224,8 @@ export function startClusterClient(mainFilename: string, options?: { const labels = getClusterLabels(); const weight = getClusterWorkerWeight(); - const clusterSecret = process.env.SCRYPTED_CLUSTER_SECRET; - const clusterMode = getScryptedClusterMode(); + const clusterSecret = process.env.SCRYPTED_CLUSTER_SECRET!; + const clusterMode = getScryptedClusterMode()!; const [, host, port] = clusterMode; const clusterPluginHosts = getBuiltinRuntimeHosts(); @@ -294,12 +294,12 @@ export function startClusterClient(mainFilename: string, options?: { try { const connectForkWorker: ConnectForkWorker = await peer.getParam('connectForkWorker'); const auth: ClusterObject = { - address: socket.localAddress, - port: socket.localPort, + address: socket.localAddress!, + port: socket.localPort!, id: process.env.SCRYPTED_CLUSTER_WORKER_NAME || os.hostname(), - proxyId: undefined, - sourceKey: undefined, - sha256: undefined, + proxyId: undefined!, + sourceKey: undefined!, + sha256: undefined!, }; auth.sha256 = computeClusterObjectHash(auth, clusterSecret); @@ -310,14 +310,14 @@ export function startClusterClient(mainFilename: string, options?: { const { clusterId, clusterWorkerId } = await connectForkWorker(auth, properties); const clusterPeerSetup = setupCluster(peer); - await clusterPeerSetup.initializeCluster({ clusterId, clusterSecret, clusterWorkerId }); + await clusterPeerSetup.initializeCluster({ clusterId, clusterSecret: clusterSecret!, clusterWorkerId }); console.log('Cluster server authenticated.', localAddress, localPort, properties); - peer.params['fork'] = createClusterForkParam(mainFilename, clusterId, clusterSecret, clusterWorkerId, clusterPluginHosts); + peer.params['fork'] = createClusterForkParam(mainFilename, clusterId, clusterSecret!, clusterWorkerId!, clusterPluginHosts); await peer.killed; } - catch (e) { + catch (e: any) { peer.kill(e.message); console.warn('Cluster client error:', localAddress, localPort, e); } @@ -334,10 +334,10 @@ export function createClusterServer(mainFilename: string, scryptedRuntime: Scryp const serverWorker: RunningClusterWorker = { labels: getClusterLabels(), id: scryptedRuntime.serverClusterWorkerId, - peer: undefined, + peer: undefined!, fork: Promise.resolve(createClusterForkParam(mainFilename, scryptedRuntime.clusterId, scryptedRuntime.clusterSecret, scryptedRuntime.serverClusterWorkerId, scryptedRuntime.pluginHosts)), name: process.env.SCRYPTED_CLUSTER_WORKER_NAME || os.hostname(), - address: process.env.SCRYPTED_CLUSTER_ADDRESS, + address: process.env.SCRYPTED_CLUSTER_ADDRESS!, weight: getClusterWorkerWeight(), forks: new Set(), mode: 'server', @@ -369,16 +369,17 @@ export function createClusterServer(mainFilename: string, scryptedRuntime: Scryp // the remote address may be ipv6 prefixed so use a fuzzy match. // eg ::ffff:192.168.2.124 if (!process.env.SCRYPTED_DISABLE_CLUSTER_SERVER_TRUST) { - if (auth.port !== socket.remotePort || !socket.remoteAddress.endsWith(auth.address)) + if (auth.port !== socket.remotePort || !socket.remoteAddress!.endsWith(auth.address)) throw new Error('cluster object address mismatch'); } + const remoteAddress = socket.remoteAddress!; const worker: RunningClusterWorker = { ...properties, id, peer, - fork: undefined, - name: auth.id, - address: socket.remoteAddress, + fork: undefined!, + name: auth.id!, + address: remoteAddress, forks: new Set(), mode: 'client', }; @@ -391,7 +392,7 @@ export function createClusterServer(mainFilename: string, scryptedRuntime: Scryp }); console.log('Cluster client authenticated.', socket.remoteAddress, socket.remotePort, properties); } - catch (e) { + catch (e: any) { console.error('Cluster client authentication failed.', socket.remoteAddress, socket.remotePort, e); peer.kill(e); socket.destroy(); diff --git a/server/src/services/cluster-fork.ts b/server/src/services/cluster-fork.ts index 262dcbf54..0773d15cb 100644 --- a/server/src/services/cluster-fork.ts +++ b/server/src/services/cluster-fork.ts @@ -9,7 +9,7 @@ import { removeIPv4EmbeddedIPv6 } from "../ip"; class WrappedForkResult implements ClusterForkResultInterface { [RpcPeer.PROPERTY_PROXY_PROPERTIES] = { - clusterWorkerId: undefined as string, + clusterWorkerId: undefined as unknown as string, }; constructor(public clusterWorkerId: string, public forkResult: Promise) { @@ -48,7 +48,7 @@ export class ClusterForkService { return matches && (!options.clusterWorkerId || worker.id === options.clusterWorkerId); }); - let worker: RunningClusterWorker; + let worker: RunningClusterWorker | undefined; // try to keep fork id affinity to single worker if present. this presents the opportunity for // IPC. @@ -73,7 +73,7 @@ export class ClusterForkService { // sort by number of forks, to distribute load. .sort((a, b) => a.worker.forks.size * a.worker.weight - b.worker.forks.size * b.worker.weight); - worker = matchingWorkers[0]?.worker; + worker = matchingWorkers[0]?.worker!; } console.log('forking to worker', worker.id, options); @@ -81,8 +81,8 @@ export class ClusterForkService { worker.fork ||= worker.peer.getParam('fork'); const fork: ClusterForkParam = await worker.fork; - const forkResultPromise = fork(options.runtime, runtimeWorkerOptions, peerLiveness, getZip); - options.id ||= this.runtime.findPluginDevice(runtimeWorkerOptions.packageJson.name)?._id; + const forkResultPromise = fork(options.runtime!, runtimeWorkerOptions, peerLiveness, getZip); + options.id! ||= this.runtime.findPluginDevice(runtimeWorkerOptions.packageJson.name)!._id; // the server is responsible for killing the forked process when the requestor is killed. // minimizes lifecycle management duplication in python and node. @@ -117,12 +117,12 @@ export class ClusterForkService { } async getParam(clusterWorkerId: string, key: string) { - const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId); + const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!; return clusterWorker.peer.getParam(key); } async getFsPromises(clusterWorkerId: string) { - const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId); + const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!; if (clusterWorker.mode === 'server') { return { [RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN]: true, @@ -133,21 +133,21 @@ export class ClusterForkService { } async getEnvControl(clusterWorkerId: string) { - const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId); + const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!; if (clusterWorker.mode === 'server') return this.runtime.envControl; return clusterWorker.peer.getParam('env-control'); } async getServiceControl(clusterWorkerId: string) { - const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId); + const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!; if (clusterWorker.mode === 'server') return this.runtime.serviceControl; return clusterWorker.peer.getParam('service-control'); } async getInfo(clusterWorkerId: string) { - const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId); + const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!; if (clusterWorker.mode === 'server') return this.runtime.info; return clusterWorker.peer.getParam('info');