mirror of
https://github.com/koush/scrypted.git
synced 2026-02-09 08:42:19 +00:00
1032 lines
39 KiB
TypeScript
1032 lines
39 KiB
TypeScript
import { Device, DeviceInformation, DeviceProvider, EngineIOHandler, HttpRequest, HttpRequestHandler, ScryptedDevice, ScryptedInterface, ScryptedInterfaceMethod, ScryptedInterfaceProperty, ScryptedNativeId, ScryptedUser as SU } from '@scrypted/types';
|
|
import AdmZip from 'adm-zip';
|
|
import crypto from 'crypto';
|
|
import * as io from 'engine.io';
|
|
import { once } from 'events';
|
|
import express, { Request, Response } from 'express';
|
|
import { ParamsDictionary } from 'express-serve-static-core';
|
|
import fs from 'fs';
|
|
import http, { ServerResponse } from 'http';
|
|
import https from 'https';
|
|
import net from 'net';
|
|
import path from 'path';
|
|
import { ParsedQs } from 'qs';
|
|
import semver from 'semver';
|
|
import { Parser as TarParser } from 'tar';
|
|
import { URL } from "url";
|
|
import WebSocket, { Server as WebSocketServer } from "ws";
|
|
import { computeClusterObjectHash } from './cluster/cluster-hash';
|
|
import { ClusterObject } from './cluster/connect-rpc-object';
|
|
import { Plugin, PluginDevice, ScryptedAlert, ScryptedUser } from './db-types';
|
|
import { httpFetch } from './fetch/http-fetch';
|
|
import { createResponseInterface } from './http-interfaces';
|
|
import { getDisplayName, getDisplayRoom, getDisplayType, getProvidedNameOrDefault, getProvidedRoomOrDefault, getProvidedTypeOrDefault } from './infer-defaults';
|
|
import { IOServer } from './io';
|
|
import Level from './level';
|
|
import { LogEntry, Logger, makeAlertId } from './logger';
|
|
import { getMixins, hasMixinCycle } from './mixin/mixin-cycle';
|
|
import { AccessControls } from './plugin/acl';
|
|
import { PluginDebug } from './plugin/plugin-debug';
|
|
import { PluginDeviceProxyHandler } from './plugin/plugin-device';
|
|
import { PluginHost, UnsupportedRuntimeError } from './plugin/plugin-host';
|
|
import { isConnectionUpgrade, PluginHttp } from './plugin/plugin-http';
|
|
import { WebSocketConnection } from './plugin/plugin-remote-websocket';
|
|
import { getPluginVolume } from './plugin/plugin-volume';
|
|
import { getBuiltinRuntimeHosts } from './plugin/runtime/runtime-host';
|
|
import { getIpAddress, SCRYPTED_INSECURE_PORT, SCRYPTED_SECURE_PORT } from './server-settings';
|
|
import { AddressSettings } from './services/addresses';
|
|
import { Alerts } from './services/alerts';
|
|
import { Backup } from './services/backup';
|
|
import { ClusterForkService } from './services/cluster-fork';
|
|
import { CORSControl } from './services/cors';
|
|
import { Info } from './services/info';
|
|
import { getNpmPackageInfo, PluginComponent } from './services/plugin';
|
|
import { ServiceControl } from './services/service-control';
|
|
import { UsersService } from './services/users';
|
|
import { getState, ScryptedStateManager, setState } from './state';
|
|
import { isClusterAddress } from './cluster/cluster-setup';
|
|
import { RunningClusterWorker } from './scrypted-cluster-main';
|
|
import { EnvControl } from './services/env';
|
|
|
|
interface DeviceProxyPair {
|
|
handler: PluginDeviceProxyHandler;
|
|
proxy: ScryptedDevice;
|
|
}
|
|
|
|
const MIN_SCRYPTED_CORE_VERSION = 'v0.2.6';
|
|
const PLUGIN_DEVICE_STATE_VERSION = 2;
|
|
|
|
interface HttpPluginData {
|
|
pluginHost: PluginHost;
|
|
pluginDevice: PluginDevice
|
|
}
|
|
|
|
export class ScryptedRuntime extends PluginHttp<HttpPluginData> {
|
|
clusterId = crypto.randomBytes(3).toString('hex');
|
|
clusterSecret = process.env.SCRYPTED_CLUSTER_SECRET || crypto.randomBytes(16).toString('hex');
|
|
clusterWorkers = new Map<string, RunningClusterWorker>();
|
|
serverClusterWorkerId: string;
|
|
plugins: { [id: string]: PluginHost } = {};
|
|
pluginDevices: { [id: string]: PluginDevice } = {};
|
|
devices: { [id: string]: DeviceProxyPair } = {};
|
|
stateManager = new ScryptedStateManager(this);
|
|
logger = new Logger(this, '', 'Scrypted');
|
|
devicesLogger = this.logger.getLogger('device', 'Devices');
|
|
wss = new WebSocketServer({ noServer: true });
|
|
wsAtomic = 0;
|
|
connectRPCObjectIO: IOServer = new io.Server({
|
|
pingTimeout: 120000,
|
|
perMessageDeflate: true,
|
|
cors: (req, callback) => {
|
|
const header = this.getAccessControlAllowOrigin(req.headers);
|
|
callback(undefined, {
|
|
origin: header,
|
|
credentials: true,
|
|
})
|
|
},
|
|
});
|
|
pluginComponent = new PluginComponent(this);
|
|
serviceControl = new ServiceControl();
|
|
alerts = new Alerts(this);
|
|
corsControl = new CORSControl(this);
|
|
addressSettings = new AddressSettings(this);
|
|
usersService = new UsersService(this);
|
|
clusterFork = new ClusterForkService(this);
|
|
envControl = new EnvControl();
|
|
info = new Info();
|
|
backup = new Backup(this);
|
|
pluginHosts = getBuiltinRuntimeHosts();
|
|
|
|
constructor(public mainFilename: string, public datastore: Level, insecure: http.Server, secure: https.Server, app: express.Application) {
|
|
super(app);
|
|
// ensure that all the users are loaded from the db.
|
|
this.usersService.getAllUsers();
|
|
|
|
app.disable('x-powered-by');
|
|
|
|
this.addMiddleware();
|
|
|
|
app.all('/engine.io/connectRPCObject', (req, res) => this.connectRPCObjectHandler(req, res));
|
|
|
|
/*
|
|
* Handle incoming connections that will be
|
|
* proxied to a connectRPCObject socket.
|
|
*
|
|
* Note that the clusterObject hash must be
|
|
* verified before connecting to the target port.
|
|
*/
|
|
this.connectRPCObjectIO.on('connection', connection => {
|
|
try {
|
|
const clusterObject: ClusterObject = JSON.parse((connection.request as Request).query.clusterObject as string);
|
|
const sha256 = computeClusterObjectHash(clusterObject, this.clusterSecret);
|
|
if (sha256 != clusterObject.sha256) {
|
|
connection.send({
|
|
error: 'invalid signature'
|
|
});
|
|
connection.close();
|
|
return;
|
|
}
|
|
|
|
let address = clusterObject.address;
|
|
if (isClusterAddress(address))
|
|
address = '127.0.0.1';
|
|
const socket = net.connect({
|
|
port: clusterObject.port,
|
|
host: address,
|
|
});
|
|
socket.on('error', () => connection.close());
|
|
socket.on('close', () => connection.close());
|
|
socket.on('data', data => connection.send(data));
|
|
connection.on('close', () => socket.destroy());
|
|
connection.on('message', message => {
|
|
if (typeof message !== 'string') {
|
|
socket.write(message);
|
|
}
|
|
else {
|
|
console.warn('unexpected string data on engine.io rpc connection. terminating.')
|
|
connection.close();
|
|
}
|
|
});
|
|
} catch {
|
|
connection.close();
|
|
}
|
|
});
|
|
|
|
insecure.on('upgrade', (req, socket, upgradeHead) => {
|
|
(req as any).upgradeHead = upgradeHead;
|
|
(app as any).handle(req, {
|
|
socket,
|
|
upgradeHead
|
|
})
|
|
});
|
|
|
|
secure.on('upgrade', (req, socket, upgradeHead) => {
|
|
(req as any).upgradeHead = upgradeHead;
|
|
(app as any).handle(req, {
|
|
socket,
|
|
upgradeHead
|
|
})
|
|
})
|
|
|
|
this.logger.on('log', (logEntry: LogEntry) => {
|
|
if (logEntry.level !== 'a')
|
|
return;
|
|
|
|
console.log('alert', logEntry);
|
|
const alert = new ScryptedAlert();
|
|
alert._id = makeAlertId(logEntry.path, logEntry.message);
|
|
alert.message = logEntry.message;
|
|
alert.timestamp = logEntry.timestamp;
|
|
alert.path = logEntry.path;
|
|
alert.title = logEntry.title;
|
|
|
|
datastore.upsert(alert);
|
|
|
|
this.stateManager.notifyInterfaceEvent(null, 'Logger' as any, logEntry);
|
|
});
|
|
|
|
// purge logs older than 2 hours every hour
|
|
setInterval(() => {
|
|
this.logger.purge(Date.now() - 48 * 60 * 60 * 1000);
|
|
}, 60 * 60 * 1000);
|
|
}
|
|
|
|
checkUpgrade(req: express.Request<ParamsDictionary, any, any, ParsedQs, Record<string, any>>, res: express.Response<any, Record<string, any>>, pluginData: HttpPluginData): void {
|
|
// pluginData.pluginHost.io.
|
|
const { sid } = req.query;
|
|
const client = (pluginData.pluginHost.io as any).clients[sid as string];
|
|
if (client) {
|
|
res.locals.username = 'existing-io-session';
|
|
}
|
|
}
|
|
|
|
addAccessControlHeaders(req: http.IncomingMessage, res: http.ServerResponse) {
|
|
res.setHeader('Vary', 'Origin,Referer');
|
|
const header = this.getAccessControlAllowOrigin(req.headers);
|
|
if (header) {
|
|
res.setHeader('Access-Control-Allow-Origin', header);
|
|
}
|
|
res.setHeader("Access-Control-Allow-Credentials", "true");
|
|
res.setHeader('Access-Control-Allow-Private-Network', 'true');
|
|
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
|
|
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization, Content-Length, X-Requested-With, Access-Control-Request-Method');
|
|
}
|
|
|
|
getAccessControlAllowOrigin(headers: http.IncomingHttpHeaders) {
|
|
let { origin, referer } = headers;
|
|
if (!origin && referer) {
|
|
try {
|
|
const u = new URL(headers.referer)
|
|
origin = u.origin;
|
|
}
|
|
catch (e) {
|
|
return;
|
|
}
|
|
}
|
|
if (!origin)
|
|
return;
|
|
const servers: string[] = process.env.SCRYPTED_ACCESS_CONTROL_ALLOW_ORIGINS?.split(',') || [];
|
|
servers.push(...Object.values(this.corsControl.origins).flat());
|
|
if (!servers.includes(origin))
|
|
return;
|
|
|
|
return origin;
|
|
}
|
|
|
|
getDeviceLogger(device: PluginDevice): Logger {
|
|
if (!device)
|
|
return;
|
|
return this.devicesLogger.getLogger(device._id, getState(device, ScryptedInterfaceProperty.name));
|
|
}
|
|
|
|
async getPluginForEndpoint(endpoint: string): Promise<HttpPluginData> {
|
|
let pluginHost = this.plugins[endpoint] ?? this.getPluginHostForDeviceId(endpoint);
|
|
if (endpoint === '@scrypted/core') {
|
|
// enforce a minimum version on @scrypted/core
|
|
if (!pluginHost || semver.lt(pluginHost.packageJson.version, MIN_SCRYPTED_CORE_VERSION)) {
|
|
try {
|
|
pluginHost = await this.installNpm('@scrypted/core');
|
|
}
|
|
catch (e) {
|
|
console.error('@scrypted/core auto install failed', e);
|
|
}
|
|
}
|
|
}
|
|
|
|
const pluginDevice = this.findPluginDevice(endpoint) ?? this.findPluginDeviceById(endpoint);
|
|
|
|
return {
|
|
pluginHost,
|
|
pluginDevice,
|
|
};
|
|
}
|
|
|
|
async connectRPCObjectHandler(req: Request, res: Response) {
|
|
const isUpgrade = isConnectionUpgrade(req.headers);
|
|
|
|
const end = (code: number, message: string) => {
|
|
if (isUpgrade) {
|
|
const socket = res.socket;
|
|
socket.write(`HTTP/1.1 ${code} ${message}\r\n` +
|
|
'\r\n');
|
|
socket.destroy();
|
|
}
|
|
else {
|
|
res.status(code);
|
|
res.send(message);
|
|
}
|
|
};
|
|
|
|
if (!res.locals.username) {
|
|
end(401, 'Not Authorized');
|
|
return;
|
|
}
|
|
|
|
const reqany = req as any;
|
|
if ((req as any).upgradeHead)
|
|
this.connectRPCObjectIO.handleUpgrade(reqany, res.socket, reqany.upgradeHead)
|
|
else
|
|
this.connectRPCObjectIO.handleRequest(reqany, res);
|
|
}
|
|
|
|
async getEndpointPluginData(req: Request, endpoint: string, isUpgrade: boolean, isEngineIOEndpoint: boolean): Promise<HttpPluginData> {
|
|
const ret = await this.getPluginForEndpoint(endpoint);
|
|
if (req.url.indexOf('/engine.io/api') !== -1)
|
|
return ret;
|
|
|
|
const { pluginDevice } = ret;
|
|
|
|
// check if upgrade requests can be handled. must be websocket.
|
|
if (isUpgrade) {
|
|
if (!pluginDevice?.state.interfaces.value.includes(ScryptedInterface.EngineIOHandler)) {
|
|
return;
|
|
}
|
|
}
|
|
else {
|
|
if (!isEngineIOEndpoint && !pluginDevice?.state.interfaces.value.includes(ScryptedInterface.HttpRequestHandler)) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
async handleWebSocket(endpoint: string, httpRequest: HttpRequest, ws: WebSocket, pluginData: HttpPluginData): Promise<void> {
|
|
const { pluginDevice } = pluginData;
|
|
|
|
const handler = this.getDevice<EngineIOHandler>(pluginDevice._id);
|
|
const id = 'ws-' + this.wsAtomic++;
|
|
const pluginHost = this.plugins[endpoint] ?? this.getPluginHostForDeviceId(endpoint);
|
|
if (!pluginHost) {
|
|
ws.close();
|
|
return;
|
|
}
|
|
pluginHost.ws[id] = ws;
|
|
|
|
ws.on('message', async (message) => {
|
|
try {
|
|
pluginHost.remote.ioEvent(id, 'message', message)
|
|
}
|
|
catch (e) {
|
|
ws.close();
|
|
}
|
|
});
|
|
ws.on('close', async (reason) => {
|
|
try {
|
|
pluginHost.remote.ioEvent(id, 'close');
|
|
}
|
|
catch (e) {
|
|
}
|
|
delete pluginHost.ws[id];
|
|
});
|
|
|
|
// @ts-expect-error
|
|
await handler.onConnection(httpRequest, new WebSocketConnection(`ws://${id}`, {
|
|
send(message) {
|
|
ws.send(message);
|
|
},
|
|
close(message) {
|
|
ws.close();
|
|
},
|
|
}));
|
|
}
|
|
|
|
async getComponent(componentId: string): Promise<any> {
|
|
switch (componentId) {
|
|
case 'SCRYPTED_IP_ADDRESS':
|
|
return getIpAddress();
|
|
case 'SCRYPTED_INSECURE_PORT':
|
|
return SCRYPTED_INSECURE_PORT;
|
|
case 'SCRYPTED_SECURE_PORT':
|
|
return SCRYPTED_SECURE_PORT;
|
|
case 'info':
|
|
return this.info;
|
|
case 'plugins':
|
|
return this.pluginComponent;
|
|
case 'service-control':
|
|
return this.serviceControl;
|
|
case 'logger':
|
|
return this.logger;
|
|
case 'alerts':
|
|
return this.alerts;
|
|
case 'cors':
|
|
return this.corsControl;
|
|
case 'addresses':
|
|
return this.addressSettings;
|
|
case "users":
|
|
return this.usersService;
|
|
case 'backup':
|
|
return this.backup;
|
|
case 'cluster-fork':
|
|
return this.clusterFork;
|
|
case 'env-control':
|
|
return this.envControl;
|
|
}
|
|
}
|
|
|
|
async getPackageJson(pluginId: string) {
|
|
let packageJson;
|
|
if (this.plugins[pluginId]) {
|
|
packageJson = this.plugins[pluginId].packageJson;
|
|
}
|
|
else {
|
|
const plugin = await this.datastore.tryGet(Plugin, pluginId);
|
|
packageJson = plugin.packageJson;
|
|
}
|
|
return packageJson;
|
|
}
|
|
|
|
async getAccessControls(username: string) {
|
|
if (!username)
|
|
return;
|
|
|
|
const user = await this.datastore.tryGet(ScryptedUser, username);
|
|
if (user?.aclId) {
|
|
const accessControl = this.getDevice<SU>(user.aclId);
|
|
const acls = await accessControl.getScryptedUserAccessControl();
|
|
if (!acls)
|
|
return;
|
|
return new AccessControls(acls);
|
|
}
|
|
}
|
|
|
|
async handleEngineIOEndpoint(req: Request, res: ServerResponse & { locals: any }, endpointRequest: HttpRequest, pluginData: HttpPluginData) {
|
|
const { pluginHost, pluginDevice } = pluginData;
|
|
|
|
const { username } = res.locals;
|
|
let accessControls: AccessControls;
|
|
|
|
try {
|
|
accessControls = await this.getAccessControls(username);
|
|
if (accessControls?.shouldRejectMethod(pluginDevice._id, ScryptedInterfaceMethod.onConnection))
|
|
accessControls.deny();
|
|
}
|
|
catch (e) {
|
|
res.writeHead(401);
|
|
res.end();
|
|
return;
|
|
}
|
|
|
|
if (!pluginHost || !pluginDevice) {
|
|
console.error('plugin does not exist or is still starting up.');
|
|
res.writeHead(500);
|
|
res.end();
|
|
return;
|
|
}
|
|
|
|
const reqany = req as any;
|
|
|
|
reqany.scrypted = {
|
|
endpointRequest,
|
|
pluginDevice,
|
|
accessControls,
|
|
};
|
|
|
|
if ((req as any).upgradeHead)
|
|
pluginHost.io.handleUpgrade(reqany, res.socket, reqany.upgradeHead)
|
|
else
|
|
pluginHost.io.handleRequest(reqany, res);
|
|
}
|
|
|
|
handleRequestEndpoint(req: Request, res: Response, endpointRequest: HttpRequest, pluginData: HttpPluginData) {
|
|
const { pluginHost, pluginDevice } = pluginData;
|
|
const handler = this.getDevice<HttpRequestHandler>(pluginDevice._id);
|
|
if (handler.interfaces.includes(ScryptedInterface.EngineIOHandler) && isConnectionUpgrade(req.headers) && req.headers.upgrade?.toLowerCase() === 'websocket') {
|
|
this.wss.handleUpgrade(req, req.socket, null, ws => {
|
|
console.log(ws);
|
|
});
|
|
}
|
|
|
|
const { pluginId } = pluginHost;
|
|
const filesPath = path.join(getPluginVolume(pluginId), 'files');
|
|
const ri = createResponseInterface(this, res, pluginHost.unzippedPath, filesPath);
|
|
handler.onRequest(endpointRequest, ri)
|
|
.catch(() => { })
|
|
.finally(() => {
|
|
if (!ri.sent) {
|
|
console.warn(pluginId, 'did not send a response before onRequest returned.');
|
|
ri.send(`Internal Plugin Error: ${pluginId}`, {
|
|
code: 500,
|
|
})
|
|
}
|
|
});
|
|
}
|
|
|
|
killPlugin(pluginId: string) {
|
|
const existing = this.plugins[pluginId];
|
|
if (existing) {
|
|
delete this.plugins[pluginId];
|
|
existing.kill();
|
|
}
|
|
this.invalidatePluginMixins(pluginId);
|
|
}
|
|
|
|
// should this be async?
|
|
invalidatePluginDevice(id: string) {
|
|
const proxyPair = this.devices[id];
|
|
if (!proxyPair)
|
|
return;
|
|
proxyPair.handler.invalidate();
|
|
return proxyPair;
|
|
}
|
|
|
|
// should this be async?
|
|
rebuildPluginDeviceMixinTable(id: string) {
|
|
const proxyPair = this.devices[id];
|
|
if (!proxyPair)
|
|
return;
|
|
proxyPair.handler.rebuildMixinTable();
|
|
return proxyPair;
|
|
}
|
|
|
|
invalidatePluginMixins(pluginId: string) {
|
|
const deviceIds = new Set<string>(Object.values(this.pluginDevices).filter(d => d.pluginId === pluginId).map(d => d._id));
|
|
this.invalidateMixins(deviceIds);
|
|
}
|
|
|
|
invalidateMixins(ids: Set<string>) {
|
|
const ret = new Set<string>();
|
|
const remaining = [...ids];
|
|
|
|
// first pass:
|
|
// for every id, find anything it is acting on as a mixin, and clear out the entry.
|
|
while (remaining.length) {
|
|
const id = remaining.pop();
|
|
|
|
for (const device of Object.values(this.devices)) {
|
|
const foundIndex = device.handler?.mixinTable?.findIndex(mt => mt.mixinProviderId === id);
|
|
if (foundIndex === -1 || foundIndex === undefined)
|
|
continue;
|
|
|
|
const did = device.handler.id;
|
|
if (!ret.has(did)) {
|
|
// add this to the list of mixin providers that need to be rebuilt
|
|
ret.add(did);
|
|
remaining.push(did);
|
|
}
|
|
|
|
// if it is the last entry, that means it is the device itself.
|
|
// can this happen? i don't think it is possible. mixin provider id would be undefined.
|
|
if (foundIndex === device.handler.mixinTable.length - 1) {
|
|
console.warn('attempt to invalidate mixin on actual device?');
|
|
continue;
|
|
}
|
|
|
|
const removed = device.handler.mixinTable.splice(0, foundIndex + 1);
|
|
for (const entry of removed) {
|
|
console.log('invalidating mixin', device.handler.id, entry.mixinProviderId);
|
|
device.handler.invalidateEntry(entry);
|
|
}
|
|
}
|
|
}
|
|
|
|
// second pass:
|
|
// rebuild the mixin tables.
|
|
for (const id of ret) {
|
|
const device = this.devices[id];
|
|
device.handler.rebuildMixinTable();
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
async installNpm(pkg: string, version?: string, installedSet?: Set<string>): Promise<PluginHost> {
|
|
if (!installedSet)
|
|
installedSet = new Set();
|
|
if (installedSet.has(pkg))
|
|
return;
|
|
installedSet.add(pkg);
|
|
|
|
const registry = await getNpmPackageInfo(pkg);
|
|
if (!version) {
|
|
version = registry['dist-tags'].latest;
|
|
}
|
|
console.log('installing package', pkg, version);
|
|
|
|
const { body: tarball } = await httpFetch({
|
|
url: `${registry.versions[version].dist.tarball}`,
|
|
// force ipv4 in case of busted ipv6.
|
|
family: 4,
|
|
});
|
|
console.log('downloaded tarball', tarball?.length);
|
|
try {
|
|
const pp = new TarParser();
|
|
}
|
|
catch (e) {
|
|
throw new Error(e);
|
|
}
|
|
const parse = new TarParser();
|
|
const files: { [name: string]: Buffer } = {};
|
|
|
|
parse.on('entry', async (entry: any) => {
|
|
console.log('parsing entry', entry.path)
|
|
const chunks: Buffer[] = [];
|
|
entry.on('data', (data: Buffer) => chunks.push(data));
|
|
|
|
entry.on('end', () => {
|
|
const buffer = Buffer.concat(chunks);
|
|
files[entry.path] = buffer;
|
|
})
|
|
});
|
|
|
|
const ret = (async () => {
|
|
await once(parse, 'end');
|
|
console.log('npm package files:', Object.keys(files).join(', '));
|
|
const packageJsonEntry = files['package/package.json'];
|
|
if (!packageJsonEntry)
|
|
throw new Error('package.json not found. are you behind a firewall?');
|
|
const packageJson = JSON.parse(packageJsonEntry.toString());
|
|
|
|
const pluginDependencies: string[] = packageJson.scrypted.pluginDependencies || [];
|
|
pluginDependencies.forEach(async (dep) => {
|
|
try {
|
|
const depId = this.findPluginDevice(dep);
|
|
if (depId)
|
|
throw new Error('Plugin already installed.');
|
|
await this.installNpm(dep);
|
|
}
|
|
catch (e) {
|
|
console.log('Skipping', dep, ':', e.message);
|
|
}
|
|
});
|
|
|
|
const npmPackage = packageJson.name;
|
|
const plugin = await this.datastore.tryGet(Plugin, npmPackage) || new Plugin();
|
|
|
|
plugin._id = npmPackage;
|
|
plugin.packageJson = packageJson;
|
|
plugin.zip = files['package/dist/plugin.zip'].toString('base64');
|
|
await this.datastore.upsert(plugin);
|
|
|
|
return this.installPlugin(plugin);
|
|
})();
|
|
|
|
parse.write(tarball);
|
|
parse.end();
|
|
return ret;
|
|
}
|
|
|
|
async installPlugin(plugin: Plugin, pluginDebug?: PluginDebug): Promise<PluginHost> {
|
|
const device: Device = Object.assign({}, plugin.packageJson.scrypted, {
|
|
info: {
|
|
manufacturer: plugin.packageJson.name,
|
|
version: plugin.packageJson.version,
|
|
}
|
|
} as Device);
|
|
try {
|
|
if (!device.interfaces.includes(ScryptedInterface.Readme)) {
|
|
const zipData = Buffer.from(plugin.zip, 'base64');
|
|
const adm = new AdmZip(zipData);
|
|
const entry = adm.getEntry('README.md');
|
|
if (entry) {
|
|
device.interfaces = device.interfaces.slice();
|
|
device.interfaces.push(ScryptedInterface.Readme);
|
|
}
|
|
}
|
|
}
|
|
catch (e) {
|
|
}
|
|
this.upsertDevice(plugin._id, device);
|
|
return this.runPlugin(plugin, pluginDebug);
|
|
}
|
|
|
|
setupPluginHostAutoRestart(pluginId: string, pluginHost?: PluginHost) {
|
|
const logger = this.getDeviceLogger(this.findPluginDevice(pluginId));
|
|
|
|
let timeout: NodeJS.Timeout;
|
|
|
|
const restart = () => {
|
|
if (timeout)
|
|
return;
|
|
|
|
const t = 60000;
|
|
pluginHost?.kill();
|
|
logger.log('e', `plugin ${pluginId} unexpectedly exited, restarting in ${t}ms`);
|
|
|
|
timeout = setTimeout(async () => {
|
|
timeout = undefined;
|
|
const plugin = await this.datastore.tryGet(Plugin, pluginId);
|
|
if (!plugin) {
|
|
logger.log('w', `scheduled plugin restart cancelled, plugin no longer exists ${pluginId}`);
|
|
return;
|
|
}
|
|
|
|
const existing = this.plugins[pluginId];
|
|
if (existing && pluginHost && existing !== pluginHost && !existing.killed) {
|
|
logger.log('w', `scheduled plugin restart cancelled, plugin was restarted by user ${pluginId}`);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
this.runPlugin(plugin);
|
|
}
|
|
catch (e) {
|
|
logger.log('e', `error restarting plugin ${pluginId}`);
|
|
logger.log('e', e.toString());
|
|
restart();
|
|
}
|
|
}, t);
|
|
};
|
|
1
|
|
if (pluginHost) {
|
|
pluginHost.worker.once('error', restart);
|
|
pluginHost.worker.once('exit', restart);
|
|
pluginHost.worker.once('close', restart);
|
|
}
|
|
else {
|
|
restart();
|
|
}
|
|
}
|
|
|
|
loadPlugin(plugin: Plugin, pluginDebug?: PluginDebug) {
|
|
const pluginId = plugin._id;
|
|
try {
|
|
this.killPlugin(pluginId);
|
|
|
|
const pluginDevices = this.findPluginDevices(pluginId);
|
|
for (const pluginDevice of pluginDevices) {
|
|
this.invalidatePluginDevice(pluginDevice._id);
|
|
}
|
|
|
|
const pluginHost = new PluginHost(this, plugin, pluginDebug);
|
|
this.plugins[pluginId] = pluginHost;
|
|
this.setupPluginHostAutoRestart(pluginId, pluginHost);
|
|
|
|
return pluginHost;
|
|
}
|
|
catch (e) {
|
|
const logger = this.getDeviceLogger(this.findPluginDevice(pluginId));
|
|
if (e instanceof UnsupportedRuntimeError) {
|
|
logger.log('e', 'error loading plugin (not retrying)');
|
|
logger.log('e', e.toString());
|
|
throw e;
|
|
}
|
|
|
|
logger.log('e', 'error loading plugin (retrying...)');
|
|
logger.log('e', e.toString());
|
|
this.setupPluginHostAutoRestart(pluginId);
|
|
throw e;
|
|
}
|
|
}
|
|
|
|
probePluginDevices(plugin: Plugin) {
|
|
const pluginId = plugin._id;
|
|
const pluginDevices = this.findPluginDevices(pluginId);
|
|
|
|
const pluginDeviceSet = new Set<string>();
|
|
for (const pluginDevice of pluginDevices) {
|
|
if (pluginDeviceSet.has(pluginDevice._id))
|
|
continue;
|
|
pluginDeviceSet.add(pluginDevice._id);
|
|
this.getDevice(pluginDevice._id)?.probe().catch(() => { });
|
|
}
|
|
|
|
for (const pluginDevice of Object.values(this.pluginDevices)) {
|
|
const { _id } = pluginDevice;
|
|
if (pluginDeviceSet.has(_id))
|
|
continue;
|
|
for (const mixinId of getMixins(this, _id)) {
|
|
if (pluginDeviceSet.has(mixinId)) {
|
|
this.getDevice(_id)?.probe().catch(() => { });
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
runPlugin(plugin: Plugin, pluginDebug?: PluginDebug) {
|
|
const pluginHost = this.loadPlugin(plugin, pluginDebug);
|
|
this.probePluginDevices(plugin);
|
|
return pluginHost;
|
|
}
|
|
|
|
findPluginDevice(pluginId: string, nativeId?: ScryptedNativeId): PluginDevice {
|
|
// JSON stringify over rpc turns undefined into null.
|
|
if (nativeId === null)
|
|
nativeId = undefined;
|
|
return Object.values(this.pluginDevices).find(device => device.pluginId === pluginId && device.nativeId == nativeId);
|
|
}
|
|
|
|
findPluginDeviceById(id: string): PluginDevice {
|
|
return this.pluginDevices[id];
|
|
}
|
|
|
|
findPluginDevices(pluginId: string): PluginDevice[] {
|
|
return Object.values(this.pluginDevices).filter(e => e.state && e.pluginId === pluginId)
|
|
}
|
|
|
|
getPluginHostForDeviceId(id: string): PluginHost {
|
|
const device = this.pluginDevices[id];
|
|
if (!device)
|
|
return;
|
|
return this.plugins[device.pluginId];
|
|
}
|
|
|
|
getDevice<T>(id: string): T & ScryptedDevice {
|
|
const device = this.devices[id];
|
|
if (device)
|
|
return device.proxy as any;
|
|
|
|
if (!this.pluginDevices[id]) {
|
|
console.warn('device not found', id);
|
|
return;
|
|
}
|
|
|
|
const handler = new PluginDeviceProxyHandler(this, id);
|
|
const proxy = new Proxy(handler, handler);
|
|
|
|
this.devices[id] = {
|
|
proxy,
|
|
handler,
|
|
};
|
|
return proxy;
|
|
}
|
|
|
|
async removeDevice(device: PluginDevice) {
|
|
// delete any devices provided by this device
|
|
const providedDevices = Object.values(this.pluginDevices).filter(pluginDevice => getState(pluginDevice, ScryptedInterfaceProperty.providerId) === device._id);
|
|
for (const provided of providedDevices) {
|
|
if (provided === device)
|
|
continue;
|
|
await this.removeDevice(provided);
|
|
}
|
|
const providerId = device.state?.providerId?.value;
|
|
device.state = undefined;
|
|
|
|
this.invalidatePluginDevice(device._id);
|
|
delete this.pluginDevices[device._id];
|
|
delete this.devices[device._id];
|
|
await this.datastore.remove(device);
|
|
this.stateManager.removeDevice(device._id);
|
|
|
|
// if this device is acting as a mixin on anything, can now remove invalidate it.
|
|
// when the mixin table is rebuilt, it will be automatically ignore and remove the dangling mixin.
|
|
this.invalidateMixins(new Set([device._id]));
|
|
|
|
// if the device is a plugin, kill and remove the plugin as well.
|
|
if (!device.nativeId) {
|
|
this.killPlugin(device.pluginId);
|
|
await this.datastore.removeId(Plugin, device.pluginId);
|
|
await fs.promises.rm(getPluginVolume(device.pluginId), {
|
|
recursive: true,
|
|
force: true,
|
|
});
|
|
}
|
|
else {
|
|
try {
|
|
// notify the plugin that a device was removed.
|
|
const plugin = this.plugins[device.pluginId];
|
|
await plugin.remote.setNativeId(device.nativeId, undefined, undefined);
|
|
const provider = this.getDevice<DeviceProvider>(providerId);
|
|
await provider?.releaseDevice(device._id, device.nativeId);
|
|
}
|
|
catch (e) {
|
|
// may throw if the plugin is killed, etc.
|
|
console.warn('error while reporting device removal to plugin remote', e);
|
|
}
|
|
}
|
|
}
|
|
|
|
upsertDevice(pluginId: string, device: Device) {
|
|
// JSON stringify over rpc turns undefined into null.
|
|
if (device.nativeId === null)
|
|
device.nativeId = undefined;
|
|
let pluginDevice = this.findPluginDevice(pluginId, device.nativeId);
|
|
if (!pluginDevice) {
|
|
pluginDevice = new PluginDevice(this.datastore.nextId().toString());
|
|
pluginDevice.stateVersion = PLUGIN_DEVICE_STATE_VERSION;
|
|
}
|
|
this.pluginDevices[pluginDevice._id] = pluginDevice;
|
|
pluginDevice.pluginId = pluginId;
|
|
pluginDevice.nativeId = device.nativeId;
|
|
pluginDevice.state = pluginDevice.state || {};
|
|
|
|
if (pluginDevice.state[ScryptedInterfaceProperty.nativeId]?.value !== pluginDevice.nativeId) {
|
|
setState(pluginDevice, ScryptedInterfaceProperty.nativeId, pluginDevice.nativeId);
|
|
}
|
|
|
|
const providedType = device.type;
|
|
const isUsingDefaultType = getDisplayType(pluginDevice) === getProvidedTypeOrDefault(pluginDevice);
|
|
const providedName = device.name;
|
|
const isUsingDefaultName = getDisplayName(pluginDevice) === getProvidedNameOrDefault(pluginDevice);
|
|
const providedRoom = device.room;
|
|
const isUsingDefaultRoom = getDisplayRoom(pluginDevice) === getProvidedRoomOrDefault(pluginDevice);
|
|
|
|
let providedInterfaces = device.interfaces.slice();
|
|
if (!device.nativeId)
|
|
providedInterfaces.push(ScryptedInterface.ScryptedPlugin);
|
|
else
|
|
providedInterfaces = providedInterfaces.filter(iface => iface !== ScryptedInterface.ScryptedPlugin);
|
|
providedInterfaces = PluginDeviceProxyHandler.sortInterfaces(providedInterfaces);
|
|
// assure final mixin resolved interface list has at least all the
|
|
// interfaces from the provided. the actual list will resolve lazily.
|
|
let mixinInterfaces: string[] = [];
|
|
const mixins: string[] = getState(pluginDevice, ScryptedInterfaceProperty.mixins) || [];
|
|
if (mixins.length)
|
|
mixinInterfaces.push(...getState(pluginDevice, ScryptedInterfaceProperty.interfaces) || []);
|
|
mixinInterfaces.push(...providedInterfaces.slice());
|
|
mixinInterfaces = PluginDeviceProxyHandler.sortInterfaces(mixinInterfaces);
|
|
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.pluginId, pluginId);
|
|
let interfacesChanged = this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.providedInterfaces, providedInterfaces);
|
|
interfacesChanged = this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.interfaces, mixinInterfaces)
|
|
|| interfacesChanged;
|
|
if (device.info !== undefined)
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.info, device.info);
|
|
const provider = this.findPluginDevice(pluginId, device.providerNativeId);
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.providerId, provider?._id);
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.providedName, providedName);
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.providedType, providedType);
|
|
if (isUsingDefaultType)
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.type, getProvidedTypeOrDefault(pluginDevice));
|
|
if (isUsingDefaultName)
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.name, getProvidedNameOrDefault(pluginDevice));
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.providedRoom, providedRoom);
|
|
if (isUsingDefaultRoom)
|
|
this.stateManager.setPluginDeviceState(pluginDevice, ScryptedInterfaceProperty.room, getProvidedRoomOrDefault(pluginDevice));
|
|
|
|
const ret = this.notifyPluginDeviceDescriptorChanged(pluginDevice);
|
|
|
|
return {
|
|
pluginDevicePromise: ret,
|
|
interfacesChanged,
|
|
};
|
|
}
|
|
|
|
notifyPluginDeviceDescriptorChanged(pluginDevice: PluginDevice) {
|
|
const ret = this.datastore.upsert(pluginDevice);
|
|
|
|
// the descriptor events should happen after everything is set, as it's an atomic operation.
|
|
this.stateManager.updateDescriptor(pluginDevice);
|
|
this.stateManager.notifyInterfaceEvent(pluginDevice, ScryptedInterface.ScryptedDevice, undefined);
|
|
|
|
return ret;
|
|
}
|
|
|
|
kill() {
|
|
for (const host of Object.values(this.plugins)) {
|
|
host?.kill();
|
|
}
|
|
}
|
|
|
|
exit() {
|
|
this.kill();
|
|
process.exit();
|
|
}
|
|
|
|
async start() {
|
|
// catch ctrl-c
|
|
process.on('SIGINT', () => this.exit());
|
|
// catch kill
|
|
process.on('SIGTERM', () => this.exit());
|
|
|
|
for await (const pluginDevice of this.datastore.getAll(PluginDevice)) {
|
|
// this may happen due to race condition around deletion/update. investigate.
|
|
if (!pluginDevice.state) {
|
|
this.datastore.remove(pluginDevice);
|
|
continue;
|
|
}
|
|
|
|
this.pluginDevices[pluginDevice._id] = pluginDevice;
|
|
let mixins: string[] = getState(pluginDevice, ScryptedInterfaceProperty.mixins) || [];
|
|
|
|
let dirty = false;
|
|
if (mixins.includes(null) || mixins.includes(undefined)) {
|
|
dirty = true;
|
|
setState(pluginDevice, ScryptedInterfaceProperty.mixins, mixins.filter(e => !!e));
|
|
}
|
|
|
|
const interfaces: string[] = getState(pluginDevice, ScryptedInterfaceProperty.providedInterfaces);
|
|
if (!pluginDevice.nativeId && !interfaces.includes(ScryptedInterface.ScryptedPlugin)) {
|
|
dirty = true;
|
|
interfaces.push(ScryptedInterface.ScryptedPlugin);
|
|
setState(pluginDevice, ScryptedInterfaceProperty.providedInterfaces, PluginDeviceProxyHandler.sortInterfaces(interfaces));
|
|
}
|
|
|
|
const pluginId: string = getState(pluginDevice, ScryptedInterfaceProperty.pluginId);
|
|
if (!pluginId) {
|
|
dirty = true;
|
|
setState(pluginDevice, ScryptedInterfaceProperty.pluginId, pluginDevice.pluginId);
|
|
}
|
|
|
|
if (pluginDevice.state[ScryptedInterfaceProperty.nativeId]?.value !== pluginDevice.nativeId) {
|
|
dirty = true;
|
|
setState(pluginDevice, ScryptedInterfaceProperty.nativeId, pluginDevice.nativeId);
|
|
}
|
|
|
|
if (dirty) {
|
|
this.datastore.upsert(pluginDevice)
|
|
.catch(e => {
|
|
console.error('There was an error saving the device? Ignoring...', e);
|
|
// return this.datastore.remove(pluginDevice);
|
|
});
|
|
}
|
|
}
|
|
|
|
for (const id of Object.keys(this.stateManager.getSystemState())) {
|
|
if (hasMixinCycle(this, id)) {
|
|
console.warn(`initialize: ${id} has a mixin cycle. Clearing mixins.`);
|
|
const pluginDevice = this.findPluginDeviceById(id);
|
|
setState(pluginDevice, ScryptedInterfaceProperty.mixins, []);
|
|
}
|
|
}
|
|
|
|
const plugins: Plugin[] = [];
|
|
for await (const plugin of this.datastore.getAll(Plugin)) {
|
|
plugins.push(plugin);
|
|
}
|
|
|
|
for (const plugin of plugins) {
|
|
try {
|
|
const pluginDevice = this.findPluginDevice(plugin._id);
|
|
setState(pluginDevice, ScryptedInterfaceProperty.info, {
|
|
manufacturer: plugin.packageJson.name,
|
|
version: plugin.packageJson.version,
|
|
} as DeviceInformation);
|
|
this.loadPlugin(plugin);
|
|
}
|
|
catch (e) {
|
|
console.error('error loading plugin', plugin._id, e);
|
|
}
|
|
}
|
|
|
|
for (const plugin of plugins) {
|
|
try {
|
|
this.probePluginDevices(plugin);
|
|
}
|
|
catch (e) {
|
|
console.error('error probing plugin devices', plugin._id, e);
|
|
}
|
|
}
|
|
|
|
if (process.env.SCRYPTED_INSTALL_PLUGIN && !plugins.find(plugin => plugin._id === process.env.SCRYPTED_INSTALL_PLUGIN)) {
|
|
try {
|
|
await this.installNpm(process.env.SCRYPTED_INSTALL_PLUGIN);
|
|
}
|
|
catch (e) {
|
|
console.error('failed to auto install plugin', process.env.SCRYPTED_INSTALL_PLUGIN);
|
|
}
|
|
}
|
|
}
|
|
}
|