server: cluster cpu usage monitoring

This commit is contained in:
Koushik Dutta
2024-12-02 15:08:56 -08:00
parent d91ec68e6c
commit a75b263141
21 changed files with 106 additions and 62 deletions

View File

@@ -5,7 +5,7 @@ import { StorageSettings } from '@scrypted/sdk/storage-settings';
import crypto from 'crypto';
import { AutoenableMixinProvider } from "../../../common/src/autoenable-mixin-provider";
import { SettingsMixinDeviceBase } from "../../../common/src/settings-mixin";
import { CpuTimer } from './cpu-timer';
import { CpuTimer } from '../../../server/src/cluster/cpu-timer';
import { FFmpegVideoFrameGenerator } from './ffmpeg-videoframes';
import { insidePolygon, normalizeBox, polygonOverlap } from './polygon';
import { SMART_MOTIONSENSOR_PREFIX, SmartMotionSensor, createObjectDetectorStorageSetting } from './smart-motionsensor';

4
sdk/package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/sdk",
"version": "0.3.92",
"version": "0.3.95",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@scrypted/sdk",
"version": "0.3.92",
"version": "0.3.95",
"license": "ISC",
"dependencies": {
"@babel/preset-typescript": "^7.26.0",

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/sdk",
"version": "0.3.92",
"version": "0.3.95",
"description": "",
"main": "dist/src/index.js",
"exports": {

View File

@@ -1,12 +1,12 @@
{
"name": "@scrypted/types",
"version": "0.3.85",
"version": "0.3.87",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "@scrypted/types",
"version": "0.3.85",
"version": "0.3.87",
"license": "ISC"
}
}

View File

@@ -1,6 +1,6 @@
{
"name": "@scrypted/types",
"version": "0.3.85",
"version": "0.3.87",
"description": "",
"main": "dist/index.js",
"author": "",

View File

@@ -468,6 +468,7 @@ class ClusterForkInterfaceOptions(TypedDict):
class ClusterWorker(TypedDict):
cpuUsage: float
forks: list[ClusterFork]
id: str
labels: list[str]
@@ -950,7 +951,7 @@ class TamperState(TypedDict):
pass
TYPES_VERSION = "0.3.85"
TYPES_VERSION = "0.3.87"
class AirPurifier:
@@ -1741,9 +1742,6 @@ class SystemManager:
def getDeviceByName(self, name: str) -> ScryptedDevice:
pass
def getDeviceState(self, id: str) -> Any:
pass
def getSystemState(self) -> Any:
pass
@@ -1844,9 +1842,6 @@ class ClusterManager:
async def getClusterWorkers(self) -> Mapping[str, ClusterWorker]:
pass
async def getRPCObjectClusterWorkerId(self, object: Any) -> str:
pass
class ScryptedInterfaceProperty(str, Enum):
id = "id"

View File

@@ -2071,12 +2071,6 @@ export interface SystemManager {
*/
getDeviceByName<T>(name: string): ScryptedDevice & T;
/**
* Get the current state of a device.
* @deprecated
*/
getDeviceState(id: string): { [property: string]: SystemDeviceState };
/**
* Get the current state of every device.
*/
@@ -2684,6 +2678,7 @@ export interface ClusterWorker {
id: string;
labels: string[];
forks: ClusterFork[];
cpuUsage: number;
}
export interface ClusterManager {
@@ -2694,7 +2689,6 @@ export interface ClusterManager {
getClusterWorkerId(): string;
getClusterMode(): 'server' | 'client' | undefined;
getClusterWorkers(): Promise<Record<string, ClusterWorker>>;
getRPCObjectClusterWorkerId(object: any): Promise<string>;
}
export interface ScryptedStatic {
@@ -2709,7 +2703,7 @@ export interface ScryptedStatic {
systemManager: SystemManager,
clusterManager: ClusterManager;
serverVersion?: string;
serverVersion: string;
pluginHostAPI: any;
pluginRemoteAPI: any;

View File

@@ -99,7 +99,7 @@
"SCRYPTED_CLUSTER_WORKER_NAME": "Macaroni 2",
"SCRYPTED_CLUSTER_LABELS": "@scrypted/coreml,compute",
"SCRYPTED_CLUSTER_MODE": "client",
"SCRYPTED_CLUSTER_SERVER": "192.168.2.130",
"SCRYPTED_CLUSTER_SERVER": "192.168.2.124",
"SCRYPTED_CLUSTER_SECRET": "swordfish",
"SCRYPTED_CAN_RESTART": "true",
"SCRYPTED_VOLUME": "/Users/koush/.scrypted-cluster/volume-client",

View File

@@ -1,18 +1,18 @@
{
"name": "@scrypted/server",
"version": "0.123.47",
"version": "0.123.48",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "@scrypted/server",
"version": "0.123.47",
"version": "0.123.48",
"hasInstallScript": true,
"license": "ISC",
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.22",
"@scrypted/types": "^0.3.82",
"@scrypted/types": "^0.3.87",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",
@@ -557,9 +557,9 @@
}
},
"node_modules/@scrypted/types": {
"version": "0.3.82",
"resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.82.tgz",
"integrity": "sha512-P9pD/4DRKZqy0ItaGTL4PNZoHncxslH9oX69GTufnpFAYKZZbcpXy1oI39gtKu1VzwPdp6b64OAm9UfzCkDEAA==",
"version": "0.3.87",
"resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.87.tgz",
"integrity": "sha512-67tY0JhRBmHQ1Qvu1CVr3byy13bir7OUhcVMCMf3QDDMfe7bwxXQ7wlsiwEpGn/BY49nioaYMT0CycDKz6gWYQ==",
"license": "ISC"
},
"node_modules/@types/adm-zip": {

View File

@@ -5,7 +5,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.22",
"@scrypted/types": "^0.3.82",
"@scrypted/types": "^0.3.87",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",

View File

@@ -21,7 +21,6 @@ class ClusterObject(TypedDict):
sourceKey: str
sha256: str
def isClusterAddress(address: str):
return not address or address == os.environ.get("SCRYPTED_CLUSTER_ADDRESS", None)
@@ -99,6 +98,7 @@ class ClusterSetup:
return
self.clusterId = options["clusterId"]
self.clusterSecret = options["clusterSecret"]
self.clusterWorkerId = options["clusterWorkerId"]
self.SCRYPTED_CLUSTER_ADDRESS = os.environ.get("SCRYPTED_CLUSTER_ADDRESS", None)
async def handleClusterClient(
@@ -144,7 +144,7 @@ class ClusterSetup:
m = hashlib.sha256()
m.update(
bytes(
f"{o['id']}{o.get('address') or ''}{o['port']}{o.get('sourceKey', None) or ''}{o['proxyId']}{self.clusterSecret}",
f"{o['id']}{o.get('address', '')}{o['port']}{o.get('sourceKey', '')}{o['proxyId']}{self.clusterSecret}",
"utf8",
)
)

View File

@@ -229,21 +229,21 @@ class EventRegistry(object):
class ClusterManager(scrypted_python.scrypted_sdk.types.ClusterManager):
def __init__(self, api: Any):
self.api = api
def __init__(self, remote: PluginRemote):
self.remote = remote
self.clusterService = None
def getClusterMode(self) -> Any | Any:
return os.getenv("SCRYPTED_CLUSTER_MODE", None)
def getClusterWorkerId(self) -> str:
return os.getenv("SCRYPTED_CLUSTER_WORKER_ID", None)
return self.remote.clusterSetup.clusterWorkerId
async def getClusterWorkers(
self,
) -> Mapping[str, scrypted_python.scrypted_sdk.types.ClusterWorker]:
self.clusterService = self.clusterService or asyncio.ensure_future(
self.api.getComponent("cluster-fork")
self.remote.api.getComponent("cluster-fork")
)
cs = await self.clusterService
return await cs.getClusterWorkers()
@@ -839,7 +839,7 @@ class PluginRemote:
self.systemManager = SystemManager(self.api, self.systemState)
self.deviceManager = DeviceManager(self.nativeIds, self.systemManager)
self.mediaManager = MediaManager(await self.api.getMediaManager())
self.clusterManager = ClusterManager(self.api)
self.clusterManager = ClusterManager(self)
try:
sdk.systemManager = self.systemManager

View File

@@ -17,6 +17,13 @@ export function isClusterAddress(address: string) {
return !address || address === process.env.SCRYPTED_CLUSTER_ADDRESS;
}
export function getClusterObject(clusterId: string, value: any) {
const clusterObject: ClusterObject = value?.__cluster;
if (clusterObject?.id !== clusterId)
return;
return clusterObject;
}
async function peerConnectRPCObject(peer: RpcPeer, o: ClusterObject) {
let peerConnectRPCObject: Promise<ConnectRPCObject> = peer.tags['connectRPCObject'];
if (!peerConnectRPCObject) {
@@ -32,6 +39,7 @@ export function setupCluster(peer: RpcPeer) {
let clusterId: string;
let clusterSecret: string;
let clusterPort: number;
let clusterWorkerId: string;
// all cluster clients, incoming and outgoing, connect with random ports which can be used as peer ids
// on the cluster server that is listening on the actual port.
@@ -236,8 +244,8 @@ export function setupCluster(peer: RpcPeer) {
}
const connectRPCObject = async (value: any) => {
const clusterObject: ClusterObject = value?.__cluster;
if (clusterObject?.id !== clusterId)
const clusterObject = getClusterObject(clusterId, value);
if (!clusterObject)
return value;
const { address, port, proxyId } = clusterObject;
// handle the case when trying to connect to an object is on this cluster node,
@@ -315,12 +323,12 @@ export function setupCluster(peer: RpcPeer) {
const initializeCluster: InitializeCluster = async (options: {
clusterId: string;
clusterSecret: string;
clusterWorkerId: string;
}) => {
if (clusterPort)
return;
({ clusterId, clusterSecret } = options);
({ clusterId, clusterSecret, clusterWorkerId, } = options);
const clients = new Set<net.Socket>();
@@ -379,7 +387,7 @@ export function setupCluster(peer: RpcPeer) {
}
}
export type InitializeCluster = (cluster: { clusterId: string, clusterSecret: string }) => Promise<void>;
export type InitializeCluster = (cluster: { clusterId: string, clusterSecret: string, clusterWorkerId: string, }) => Promise<void>;
export function getScryptedClusterMode(): ['server' | 'client', string, number] {
const mode = process.env.SCRYPTED_CLUSTER_MODE as 'server' | 'client';

View File

@@ -1,9 +1,27 @@
export interface ClusterObject {
/**
* Id of the cluster.
*/
id: string;
/**
* Address of the process that created this object.
*/
address: string;
/**
* Port of the process that created this object.
*/
port: number;
/**
* Id of the object within the source peer.
*/
proxyId: string;
/**
* Id of the source peer.
*/
sourceKey: string;
/**
* Hash of the object.
*/
sha256: string;
}

View File

@@ -164,6 +164,7 @@ export interface PluginRemoteLoadZipOptions {
main?: string;
clusterId: string;
clusterWorkerId: string;
clusterSecret: string;
}

View File

@@ -248,6 +248,7 @@ export class PluginHost {
const loadZipOptions: PluginRemoteLoadZipOptions = {
clusterId: this.scrypted.clusterId,
clusterSecret: this.scrypted.clusterSecret,
clusterWorkerId: await this.clusterWorkerId,
// debug flag can be used to affect path resolution for sourcemaps etc.
debug: !!pluginDebug,
zipHash: this.zipHash,
@@ -390,6 +391,7 @@ export class PluginHost {
await clusterSetup.initializeCluster({
clusterId: this.scrypted.clusterId,
clusterSecret: this.scrypted.clusterSecret,
clusterWorkerId: this.scrypted.serverClusterWorkerId,
});
return this.scrypted.clusterFork;
})(),

View File

@@ -115,7 +115,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
await initializeCluster(zipOptions);
scrypted.connectRPCObject = connectRPCObject;
scrypted.clusterManager = new ClusterManagerImpl(api);
scrypted.clusterManager = new ClusterManagerImpl(api, zipOptions.clusterWorkerId);
if (worker_threads.isMainThread) {
const fsDir = path.join(unzippedPath, 'fs')
@@ -332,6 +332,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
}
const forkOptions = Object.assign({}, zipOptions);
forkOptions.clusterWorkerId = await clusterWorkerId || forkOptions.clusterWorkerId;
forkOptions.fork = true;
forkOptions.main = options?.filename;
const forkZipAPI = new PluginZipAPI(() => zipAPI.getZip());

View File

@@ -65,6 +65,7 @@ export class ScryptedRuntime extends PluginHttp<HttpPluginData> {
clusterId = crypto.randomBytes(3).toString('hex');
clusterSecret = process.env.SCRYPTED_CLUSTER_SECRET || crypto.randomBytes(16).toString('hex');
clusterWorkers = new Map<string, RunningClusterWorker>();
serverClusterWorkerId: string;
plugins: { [id: string]: PluginHost } = {};
pluginDevices: { [id: string]: PluginDevice } = {};
devices: { [id: string]: DeviceProxyPair } = {};

View File

@@ -1,4 +1,4 @@
import type { ClusterManager, ClusterWorker, ForkOptions } from '@scrypted/types';
import type { ClusterManager, ForkOptions } from '@scrypted/types';
import crypto from 'crypto';
import { once } from 'events';
import net from 'net';
@@ -11,6 +11,7 @@ import { computeClusterObjectHash } from './cluster/cluster-hash';
import { getClusterLabels, getClusterWorkerWeight } from './cluster/cluster-labels';
import { getScryptedClusterMode, InitializeCluster, setupCluster } from './cluster/cluster-setup';
import type { ClusterObject } from './cluster/connect-rpc-object';
import { CpuTimer } from './cluster/cpu-timer';
import type { PluginAPI } from './plugin/plugin-api';
import { getPluginVolume, getScryptedVolume } from './plugin/plugin-volume';
import { prepareZip } from './plugin/runtime/node-worker-common';
@@ -20,10 +21,10 @@ import { RpcPeer } from './rpc';
import { createRpcDuplexSerializer } from './rpc-serializer';
import type { ScryptedRuntime } from './runtime';
import type { ClusterForkService } from './services/cluster-fork';
import { sleep } from './sleep';
import type { ServiceControl } from './services/service-control';
import { EnvControl } from './services/env';
import { Info } from './services/info';
import type { ServiceControl } from './services/service-control';
import { sleep } from './sleep';
installSourceMapSupport({
environment: 'node',
@@ -84,6 +85,7 @@ export interface RunningClusterWorker extends ClusterWorkerProperties {
forks: Set<ClusterForkOptions>;
address: string;
weight: number;
cpuUsage: number;
}
export class PeerLiveness {
@@ -120,7 +122,7 @@ export interface ClusterForkResultInterface {
export type ClusterForkParam = (runtime: string, options: RuntimeWorkerOptions, peerLiveness: PeerLiveness, getZip: () => Promise<Buffer>) => Promise<ClusterForkResultInterface>;
function createClusterForkParam(mainFilename: string, clusterId: string, clusterSecret: string) {
function createClusterForkParam(mainFilename: string, clusterId: string, clusterSecret: string, clusterWorkerId: string) {
const clusterForkParam: ClusterForkParam = async (runtime, runtimeWorkerOptions, peerLiveness, getZip) => {
let runtimeWorker: RuntimeWorker;
@@ -166,7 +168,7 @@ function createClusterForkParam(mainFilename: string, clusterId: string, cluster
let ping: any;
try {
const initializeCluster: InitializeCluster = await threadPeer.getParam('initializeCluster');
await initializeCluster({ clusterId, clusterSecret });
await initializeCluster({ clusterId, clusterSecret, clusterWorkerId });
getRemote = await threadPeer.getParam('getRemote');
ping = await threadPeer.getParam('ping');
}
@@ -206,6 +208,7 @@ export function startClusterClient(mainFilename: string, serviceControl?: Servic
console.log('Cluster client starting.');
const envControl = new EnvControl();
const cpuTimer = new CpuTimer();
const originalClusterAddress = process.env.SCRYPTED_CLUSTER_ADDRESS;
const labels = getClusterLabels();
@@ -259,6 +262,7 @@ export function startClusterClient(mainFilename: string, serviceControl?: Servic
peer.params['service-control'] = serviceControl;
peer.params['env-control'] = envControl;
peer.params['info'] = new Info();
peer.params['cpu'] = async () => cpuTimer.sample();
const { localAddress, localPort } = socket;
console.log('Cluster server connected.', localAddress, localPort);
@@ -284,11 +288,10 @@ export function startClusterClient(mainFilename: string, serviceControl?: Servic
};
const { clusterId, clusterWorkerId } = await connectForkWorker(auth, properties);
process.env.SCRYPTED_CLUSTER_WORKER_ID = clusterWorkerId;
const clusterPeerSetup = setupCluster(peer);
await clusterPeerSetup.initializeCluster({ clusterId, clusterSecret });
await clusterPeerSetup.initializeCluster({ clusterId, clusterSecret, clusterWorkerId });
peer.params['fork'] = createClusterForkParam(mainFilename, clusterId, clusterSecret);
peer.params['fork'] = createClusterForkParam(mainFilename, clusterId, clusterSecret, clusterWorkerId);
await peer.killed;
}
@@ -305,19 +308,26 @@ export function startClusterClient(mainFilename: string, serviceControl?: Servic
}
export function createClusterServer(mainFilename: string, scryptedRuntime: ScryptedRuntime, certificate: ReturnType<typeof createSelfSignedCertificate>) {
const serverClusterWorkerId = crypto.randomUUID();
process.env.SCRYPTED_CLUSTER_WORKER_ID = serverClusterWorkerId;
scryptedRuntime.serverClusterWorkerId = crypto.randomUUID();
const serverWorker: RunningClusterWorker = {
labels: getClusterLabels(),
id: serverClusterWorkerId,
id: scryptedRuntime.serverClusterWorkerId,
peer: undefined,
fork: Promise.resolve(createClusterForkParam(mainFilename, scryptedRuntime.clusterId, scryptedRuntime.clusterSecret)),
fork: Promise.resolve(createClusterForkParam(mainFilename, scryptedRuntime.clusterId, scryptedRuntime.clusterSecret, scryptedRuntime.serverClusterWorkerId)),
name: process.env.SCRYPTED_CLUSTER_WORKER_NAME || os.hostname(),
address: process.env.SCRYPTED_CLUSTER_ADDRESS,
weight: getClusterWorkerWeight(),
forks: new Set(),
cpuUsage: 0,
};
scryptedRuntime.clusterWorkers.set(serverClusterWorkerId, serverWorker);
scryptedRuntime.clusterWorkers.set(scryptedRuntime.serverClusterWorkerId, serverWorker);
{
const cpuTimer = new CpuTimer();
setInterval(() => {
serverWorker.cpuUsage = cpuTimer.sample();
}, 1000);
}
const server = tls.createServer({
key: certificate.serviceKey,
@@ -353,6 +363,7 @@ export function createClusterServer(mainFilename: string, scryptedRuntime: Scryp
name: auth.id,
address: socket.remoteAddress,
forks: new Set(),
cpuUsage: 0,
};
scryptedRuntime.clusterWorkers.set(id, worker);
peer.killedSafe.finally(() => {
@@ -362,6 +373,16 @@ export function createClusterServer(mainFilename: string, scryptedRuntime: Scryp
scryptedRuntime.clusterWorkers.delete(id);
});
console.log('Cluster client authenticated.', socket.remoteAddress, socket.remotePort, properties);
let cpu: Promise<() => Promise<number>>;
const cpuTimer = setInterval(async () => {
cpu ||= peer.getParam('cpu');
const usage = await (await cpu)();
worker.cpuUsage = usage;
}, 1000);
peer.killedSafe.finally(() => {
clearInterval(cpuTimer);
});
}
catch (e) {
peer.kill(e);
@@ -383,18 +404,18 @@ export class ClusterManagerImpl implements ClusterManager {
private clusterServicePromise: Promise<ClusterForkService>;
private clusterMode = getScryptedClusterMode()?.[0];
constructor(private api: PluginAPI) {
constructor(private api: PluginAPI, private clusterWorkerId: string) {
}
getClusterWorkerId(): string {
return process.env.SCRYPTED_CLUSTER_WORKER_ID;
return this.clusterWorkerId;
}
getClusterMode(): 'server' | 'client' | undefined {
return this.clusterMode;
}
async getClusterWorkers(): Promise<Record<string, ClusterWorker>> {
async getClusterWorkers() {
const clusterFork = await this.getClusterService();
return clusterFork.getClusterWorkers();
}

View File

@@ -1,3 +1,4 @@
import { ClusterFork, ClusterWorker } from "@scrypted/types";
import { matchesClusterLabels } from "../cluster/cluster-labels";
import type { RuntimeWorkerOptions } from "../plugin/runtime/runtime-worker";
import { RpcPeer } from "../rpc";
@@ -98,13 +99,15 @@ export class ClusterForkService {
return ret;
};
async getClusterWorkers() {
const ret: any = {};
async getClusterWorkers(): Promise<Record<string, ClusterWorker>> {
const ret: Record<string, ClusterWorker> = {};
for (const worker of this.runtime.clusterWorkers.values()) {
ret[worker.id] = {
id: worker.id,
name: worker.name,
labels: worker.labels,
forks: [...worker.forks],
forks: [...worker.forks] as ClusterFork[],
cpuUsage: worker.cpuUsage,
};
}
return ret;