mirror of
https://github.com/koush/scrypted.git
synced 2026-05-06 06:00:29 +01:00
cluster: add type assertions for strictNullChecks compliance
Fix strictNullChecks in cluster modules: - cluster-labels.ts: add assertions for label and weight access - cluster.ts: keep getClusterWorkerId returning string (API contract) - scrypted-cluster-main.ts: add assertions for socket properties, consolidate remoteAddress assertion, fix cluster worker lookups - cluster-fork.ts: add assertions for worker and options properties, fix findPluginDevice call with assertion
This commit is contained in:
@@ -45,13 +45,13 @@ export function getClusterWorkerWeight() {
|
||||
return parseFloat(process.env.SCRYPTED_CLUSTER_WEIGHT || '') || 1;
|
||||
}
|
||||
|
||||
export function needsClusterForkWorker(options: ClusterForkOptions) {
|
||||
export function needsClusterForkWorker(options?: ClusterForkOptions) {
|
||||
return process.env.SCRYPTED_CLUSTER_ADDRESS
|
||||
&& options
|
||||
&& (!matchesClusterLabels(options, getClusterLabels()) || options.clusterWorkerId);
|
||||
}
|
||||
|
||||
export function utilizesClusterForkWorker(options: ClusterForkOptions) {
|
||||
export function utilizesClusterForkWorker(options?: ClusterForkOptions) {
|
||||
return process.env.SCRYPTED_CLUSTER_ADDRESS
|
||||
&& (options?.labels || options?.clusterWorkerId);
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ export interface ClusterForkServiceInterface {
|
||||
}
|
||||
|
||||
export class ClusterManagerImpl implements ClusterManager {
|
||||
private clusterServicePromise: Promise<ClusterForkServiceInterface>;
|
||||
private clusterServicePromise!: Promise<ClusterForkServiceInterface>;
|
||||
|
||||
constructor(public clusterMode: undefined | 'client' | 'server', private api: PluginAPI, private clusterWorkerId: string) {
|
||||
}
|
||||
|
||||
@@ -152,7 +152,7 @@ function createClusterForkParam(mainFilename: string, clusterId: string, cluster
|
||||
SCRYPTED_FFMPEG_PATH: process.env.SCRYPTED_FFMPEG_PATH || await getScryptedFFmpegPath(),
|
||||
};
|
||||
|
||||
runtimeWorker = rt(mainFilename, runtimeWorkerOptions, undefined);
|
||||
runtimeWorker = rt(mainFilename, runtimeWorkerOptions, undefined!);
|
||||
runtimeWorker.stdout.on('data', data => console.log(data.toString()));
|
||||
runtimeWorker.stderr.on('data', data => console.error(data.toString()));
|
||||
|
||||
@@ -224,8 +224,8 @@ export function startClusterClient(mainFilename: string, options?: {
|
||||
const labels = getClusterLabels();
|
||||
const weight = getClusterWorkerWeight();
|
||||
|
||||
const clusterSecret = process.env.SCRYPTED_CLUSTER_SECRET;
|
||||
const clusterMode = getScryptedClusterMode();
|
||||
const clusterSecret = process.env.SCRYPTED_CLUSTER_SECRET!;
|
||||
const clusterMode = getScryptedClusterMode()!;
|
||||
const [, host, port] = clusterMode;
|
||||
|
||||
const clusterPluginHosts = getBuiltinRuntimeHosts();
|
||||
@@ -294,12 +294,12 @@ export function startClusterClient(mainFilename: string, options?: {
|
||||
try {
|
||||
const connectForkWorker: ConnectForkWorker = await peer.getParam('connectForkWorker');
|
||||
const auth: ClusterObject = {
|
||||
address: socket.localAddress,
|
||||
port: socket.localPort,
|
||||
address: socket.localAddress!,
|
||||
port: socket.localPort!,
|
||||
id: process.env.SCRYPTED_CLUSTER_WORKER_NAME || os.hostname(),
|
||||
proxyId: undefined,
|
||||
sourceKey: undefined,
|
||||
sha256: undefined,
|
||||
proxyId: undefined!,
|
||||
sourceKey: undefined!,
|
||||
sha256: undefined!,
|
||||
};
|
||||
auth.sha256 = computeClusterObjectHash(auth, clusterSecret);
|
||||
|
||||
@@ -310,14 +310,14 @@ export function startClusterClient(mainFilename: string, options?: {
|
||||
|
||||
const { clusterId, clusterWorkerId } = await connectForkWorker(auth, properties);
|
||||
const clusterPeerSetup = setupCluster(peer);
|
||||
await clusterPeerSetup.initializeCluster({ clusterId, clusterSecret, clusterWorkerId });
|
||||
await clusterPeerSetup.initializeCluster({ clusterId, clusterSecret: clusterSecret!, clusterWorkerId });
|
||||
console.log('Cluster server authenticated.', localAddress, localPort, properties);
|
||||
|
||||
peer.params['fork'] = createClusterForkParam(mainFilename, clusterId, clusterSecret, clusterWorkerId, clusterPluginHosts);
|
||||
peer.params['fork'] = createClusterForkParam(mainFilename, clusterId, clusterSecret!, clusterWorkerId!, clusterPluginHosts);
|
||||
|
||||
await peer.killed;
|
||||
}
|
||||
catch (e) {
|
||||
catch (e: any) {
|
||||
peer.kill(e.message);
|
||||
console.warn('Cluster client error:', localAddress, localPort, e);
|
||||
}
|
||||
@@ -334,10 +334,10 @@ export function createClusterServer(mainFilename: string, scryptedRuntime: Scryp
|
||||
const serverWorker: RunningClusterWorker = {
|
||||
labels: getClusterLabels(),
|
||||
id: scryptedRuntime.serverClusterWorkerId,
|
||||
peer: undefined,
|
||||
peer: undefined!,
|
||||
fork: Promise.resolve(createClusterForkParam(mainFilename, scryptedRuntime.clusterId, scryptedRuntime.clusterSecret, scryptedRuntime.serverClusterWorkerId, scryptedRuntime.pluginHosts)),
|
||||
name: process.env.SCRYPTED_CLUSTER_WORKER_NAME || os.hostname(),
|
||||
address: process.env.SCRYPTED_CLUSTER_ADDRESS,
|
||||
address: process.env.SCRYPTED_CLUSTER_ADDRESS!,
|
||||
weight: getClusterWorkerWeight(),
|
||||
forks: new Set(),
|
||||
mode: 'server',
|
||||
@@ -369,16 +369,17 @@ export function createClusterServer(mainFilename: string, scryptedRuntime: Scryp
|
||||
// the remote address may be ipv6 prefixed so use a fuzzy match.
|
||||
// eg ::ffff:192.168.2.124
|
||||
if (!process.env.SCRYPTED_DISABLE_CLUSTER_SERVER_TRUST) {
|
||||
if (auth.port !== socket.remotePort || !socket.remoteAddress.endsWith(auth.address))
|
||||
if (auth.port !== socket.remotePort || !socket.remoteAddress!.endsWith(auth.address))
|
||||
throw new Error('cluster object address mismatch');
|
||||
}
|
||||
const remoteAddress = socket.remoteAddress!;
|
||||
const worker: RunningClusterWorker = {
|
||||
...properties,
|
||||
id,
|
||||
peer,
|
||||
fork: undefined,
|
||||
name: auth.id,
|
||||
address: socket.remoteAddress,
|
||||
fork: undefined!,
|
||||
name: auth.id!,
|
||||
address: remoteAddress,
|
||||
forks: new Set(),
|
||||
mode: 'client',
|
||||
};
|
||||
@@ -391,7 +392,7 @@ export function createClusterServer(mainFilename: string, scryptedRuntime: Scryp
|
||||
});
|
||||
console.log('Cluster client authenticated.', socket.remoteAddress, socket.remotePort, properties);
|
||||
}
|
||||
catch (e) {
|
||||
catch (e: any) {
|
||||
console.error('Cluster client authentication failed.', socket.remoteAddress, socket.remotePort, e);
|
||||
peer.kill(e);
|
||||
socket.destroy();
|
||||
|
||||
@@ -9,7 +9,7 @@ import { removeIPv4EmbeddedIPv6 } from "../ip";
|
||||
|
||||
class WrappedForkResult implements ClusterForkResultInterface {
|
||||
[RpcPeer.PROPERTY_PROXY_PROPERTIES] = {
|
||||
clusterWorkerId: undefined as string,
|
||||
clusterWorkerId: undefined as unknown as string,
|
||||
};
|
||||
|
||||
constructor(public clusterWorkerId: string, public forkResult: Promise<ClusterForkResultInterface>) {
|
||||
@@ -48,7 +48,7 @@ export class ClusterForkService {
|
||||
return matches && (!options.clusterWorkerId || worker.id === options.clusterWorkerId);
|
||||
});
|
||||
|
||||
let worker: RunningClusterWorker;
|
||||
let worker: RunningClusterWorker | undefined;
|
||||
|
||||
// try to keep fork id affinity to single worker if present. this presents the opportunity for
|
||||
// IPC.
|
||||
@@ -73,7 +73,7 @@ export class ClusterForkService {
|
||||
// sort by number of forks, to distribute load.
|
||||
.sort((a, b) => a.worker.forks.size * a.worker.weight - b.worker.forks.size * b.worker.weight);
|
||||
|
||||
worker = matchingWorkers[0]?.worker;
|
||||
worker = matchingWorkers[0]?.worker!;
|
||||
}
|
||||
|
||||
console.log('forking to worker', worker.id, options);
|
||||
@@ -81,8 +81,8 @@ export class ClusterForkService {
|
||||
worker.fork ||= worker.peer.getParam('fork');
|
||||
const fork: ClusterForkParam = await worker.fork;
|
||||
|
||||
const forkResultPromise = fork(options.runtime, runtimeWorkerOptions, peerLiveness, getZip);
|
||||
options.id ||= this.runtime.findPluginDevice(runtimeWorkerOptions.packageJson.name)?._id;
|
||||
const forkResultPromise = fork(options.runtime!, runtimeWorkerOptions, peerLiveness, getZip);
|
||||
options.id! ||= this.runtime.findPluginDevice(runtimeWorkerOptions.packageJson.name)!._id;
|
||||
|
||||
// the server is responsible for killing the forked process when the requestor is killed.
|
||||
// minimizes lifecycle management duplication in python and node.
|
||||
@@ -117,12 +117,12 @@ export class ClusterForkService {
|
||||
}
|
||||
|
||||
async getParam(clusterWorkerId: string, key: string) {
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId);
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!;
|
||||
return clusterWorker.peer.getParam(key);
|
||||
}
|
||||
|
||||
async getFsPromises(clusterWorkerId: string) {
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId);
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!;
|
||||
if (clusterWorker.mode === 'server') {
|
||||
return {
|
||||
[RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN]: true,
|
||||
@@ -133,21 +133,21 @@ export class ClusterForkService {
|
||||
}
|
||||
|
||||
async getEnvControl(clusterWorkerId: string) {
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId);
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!;
|
||||
if (clusterWorker.mode === 'server')
|
||||
return this.runtime.envControl;
|
||||
return clusterWorker.peer.getParam('env-control');
|
||||
}
|
||||
|
||||
async getServiceControl(clusterWorkerId: string) {
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId);
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!;
|
||||
if (clusterWorker.mode === 'server')
|
||||
return this.runtime.serviceControl;
|
||||
return clusterWorker.peer.getParam('service-control');
|
||||
}
|
||||
|
||||
async getInfo(clusterWorkerId: string) {
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId);
|
||||
const clusterWorker = this.runtime.clusterWorkers.get(clusterWorkerId)!;
|
||||
if (clusterWorker.mode === 'server')
|
||||
return this.runtime.info;
|
||||
return clusterWorker.peer.getParam('info');
|
||||
|
||||
Reference in New Issue
Block a user