diff --git a/server/src/plugin/plugin-host.ts b/server/src/plugin/plugin-host.ts index 0978de301..d6472e6b5 100644 --- a/server/src/plugin/plugin-host.ts +++ b/server/src/plugin/plugin-host.ts @@ -1,29 +1,24 @@ -import { RpcMessage, RpcPeer } from '../rpc'; +import { RpcPeer } from '../rpc'; import AdmZip from 'adm-zip'; -import { SystemManager, DeviceManager, ScryptedNativeId, Device, EventListenerRegister, EngineIOHandler, ScryptedInterface, ScryptedInterfaceProperty } from '@scrypted/sdk/types' +import { Device, EngineIOHandler } from '@scrypted/sdk/types' import { ScryptedRuntime } from '../runtime'; import { Plugin } from '../db-types'; import io, { Socket } from 'engine.io'; -import { attachPluginRemote, PluginReader, setupPluginRemote } from './plugin-remote'; -import { PluginAPI, PluginAPIProxy, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api'; +import { setupPluginRemote } from './plugin-remote'; +import { PluginAPIProxy, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api'; import { Logger } from '../logger'; -import { MediaManagerHostImpl, MediaManagerImpl } from './media'; +import { MediaManagerHostImpl } from './media'; import WebSocket from 'ws'; -import { PassThrough } from 'stream'; -import { Console } from 'console' import { sleep } from '../sleep'; import { PluginHostAPI } from './plugin-host-api'; import path from 'path'; -import { install as installSourceMapSupport } from 'source-map-support'; -import net from 'net' import child_process from 'child_process'; import { PluginDebug } from './plugin-debug'; import readline from 'readline'; import { Readable, Writable } from 'stream'; import { ensurePluginVolume, getScryptedVolume } from './plugin-volume'; -import { getPluginNodePath, installOptionalDependencies } from './plugin-npm-dependencies'; +import { getPluginNodePath } from './plugin-npm-dependencies'; import { ConsoleServer, createConsoleServer } from './plugin-console'; -import { createREPLServer } from './plugin-repl'; import { LazyRemote } from './plugin-lazy-remote'; import crypto from 'crypto'; import fs from 'fs'; @@ -364,235 +359,3 @@ export class PluginHost { return setupPluginRemote(rpcPeer, api, null, () => this.scrypted.stateManager.getSystemState()); } } - -export function startPluginRemote() { - const peer = new RpcPeer('unknown', 'host', (message, reject) => process.send(message, undefined, { - swallowErrors: !reject, - }, e => { - if (e) - reject?.(e); - })); - peer.transportSafeArgumentTypes.add(Buffer.name); - process.on('message', message => peer.handleMessage(message as RpcMessage)); - process.on('disconnect', () => { - console.error('peer host disconnected, exiting.'); - process.exit(1); - }); - - let systemManager: SystemManager; - let deviceManager: DeviceManager; - let api: PluginAPI; - let pluginId: string; - - const getConsole = (hook: (stdout: PassThrough, stderr: PassThrough) => Promise, - also?: Console, alsoPrefix?: string) => { - - const stdout = new PassThrough(); - const stderr = new PassThrough(); - - hook(stdout, stderr); - - const ret = new Console(stdout, stderr); - - const methods = [ - 'log', 'warn', - 'dir', 'time', - 'timeEnd', 'timeLog', - 'trace', 'assert', - 'clear', 'count', - 'countReset', 'group', - 'groupEnd', 'table', - 'debug', 'info', - 'dirxml', 'error', - 'groupCollapsed', - ]; - - const printers = ['log', 'info', 'debug', 'trace', 'warn', 'error']; - for (const m of methods) { - const old = (ret as any)[m].bind(ret); - (ret as any)[m] = (...args: any[]) => { - // prefer the mixin version for local/remote console dump. - if (also && alsoPrefix && printers.includes(m)) { - (also as any)[m](alsoPrefix, ...args); - } - else { - (console as any)[m](...args); - } - // call through to old method to ensure it gets written - // to log buffer. - old(...args); - } - } - - return ret; - } - - let pluginsPromise: Promise; - function getPlugins() { - if (!pluginsPromise) - pluginsPromise = api.getComponent('plugins'); - return pluginsPromise; - } - - const getDeviceConsole = (nativeId?: ScryptedNativeId) => { - // the the plugin console is simply the default console - // and gets read from stderr/stdout. - if (!nativeId) - return console; - - return getConsole(async (stdout, stderr) => { - const connect = async () => { - const plugins = await getPlugins(); - const port = await plugins.getRemoteServicePort(peer.selfName, 'console-writer'); - const socket = net.connect(port); - socket.write(nativeId + '\n'); - const writer = (data: Buffer) => { - socket.write(data); - }; - stdout.on('data', writer); - stderr.on('data', writer); - socket.on('error', () => { - stdout.removeAllListeners(); - stderr.removeAllListeners(); - stdout.pause(); - stderr.pause(); - setTimeout(connect, 10000); - }); - }; - connect(); - }, undefined, undefined); - } - - const getMixinConsole = (mixinId: string, nativeId: ScryptedNativeId) => { - return getConsole(async (stdout, stderr) => { - if (!mixinId) { - return; - } - // todo: fix this. a mixin provider can mixin another device to make it a mixin provider itself. - // so the mixin id in the mixin table will be incorrect. - // there's no easy way to fix this from the remote. - // if (!systemManager.getDeviceById(mixinId).mixins.includes(idForNativeId(nativeId))) { - // return; - // } - const reconnect = () => { - stdout.removeAllListeners(); - stderr.removeAllListeners(); - stdout.pause(); - stderr.pause(); - setTimeout(tryConnect, 10000); - }; - - const connect = async () => { - const ds = deviceManager.getDeviceState(nativeId); - if (!ds) { - // deleted? - return; - } - - const plugins = await getPlugins(); - const { pluginId, nativeId: mixinNativeId } = await plugins.getDeviceInfo(mixinId); - const port = await plugins.getRemoteServicePort(pluginId, 'console-writer'); - const socket = net.connect(port); - socket.write(mixinNativeId + '\n'); - const writer = (data: Buffer) => { - let str = data.toString().trim(); - str = str.replaceAll('\n', `\n[${ds.name}]: `); - str = `[${ds.name}]: ` + str + '\n'; - socket.write(str); - }; - stdout.on('data', writer); - stderr.on('data', writer); - socket.on('close', reconnect); - }; - - const tryConnect = async () => { - try { - await connect(); - } - catch (e) { - reconnect(); - } - } - tryConnect(); - }, getDeviceConsole(nativeId), `[${systemManager.getDeviceById(mixinId)?.name}]`); - } - - let lastCpuUsage: NodeJS.CpuUsage; - setInterval(() => { - const cpuUsage = process.cpuUsage(lastCpuUsage); - lastCpuUsage = cpuUsage; - peer.sendOob({ - type: 'stats', - cpu: cpuUsage, - memoryUsage: process.memoryUsage(), - }); - global?.gc(); - }, 10000); - - let replPort: Promise; - - let _pluginConsole: Console; - const getPluginConsole = () => { - if (!_pluginConsole) - _pluginConsole = getDeviceConsole(undefined); - return _pluginConsole; - } - - attachPluginRemote(peer, { - createMediaManager: async (sm) => { - systemManager = sm; - return new MediaManagerImpl(systemManager, getPluginConsole()); - }, - onGetRemote: async (_api, _pluginId) => { - api = _api; - pluginId = _pluginId; - peer.selfName = pluginId; - }, - onPluginReady: async (scrypted, params, plugin) => { - replPort = createREPLServer(scrypted, params, plugin); - }, - getPluginConsole, - getDeviceConsole, - getMixinConsole, - async getServicePort(name, ...args: any[]) { - if (name === 'repl') { - if (!replPort) - throw new Error('REPL unavailable: Plugin not loaded.') - return replPort; - } - throw new Error(`unknown service ${name}`); - }, - async onLoadZip(pluginReader: PluginReader, packageJson: any) { - const entry = pluginReader('main.nodejs.js.map') - const map = entry?.toString(); - - installSourceMapSupport({ - environment: 'node', - retrieveSourceMap(source) { - if (source === '/plugin/main.nodejs.js' || source === `/${pluginId}/main.nodejs.js`) { - if (!map) - return null; - return { - url: '/plugin/main.nodejs.js', - map, - } - } - return null; - } - }); - await installOptionalDependencies(getPluginConsole(), packageJson); - } - }).then(scrypted => { - systemManager = scrypted.systemManager; - deviceManager = scrypted.deviceManager; - - process.on('uncaughtException', e => { - getPluginConsole().error('uncaughtException', e); - scrypted.log.e('uncaughtException ' + e?.toString()); - }); - process.on('unhandledRejection', e => { - getPluginConsole().error('unhandledRejection', e); - scrypted.log.e('unhandledRejection ' + e?.toString()); - }); - }) -} diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts new file mode 100644 index 000000000..647521f51 --- /dev/null +++ b/server/src/plugin/plugin-remote-worker.ts @@ -0,0 +1,243 @@ +import { RpcMessage, RpcPeer } from '../rpc'; +import { SystemManager, DeviceManager, ScryptedNativeId } from '@scrypted/sdk/types' +import { attachPluginRemote, PluginReader } from './plugin-remote'; +import { PluginAPI } from './plugin-api'; +import { MediaManagerImpl } from './media'; +import { PassThrough } from 'stream'; +import { Console } from 'console' +import { install as installSourceMapSupport } from 'source-map-support'; +import net from 'net' +import { installOptionalDependencies } from './plugin-npm-dependencies'; +import { createREPLServer } from './plugin-repl'; + +export function startPluginRemote() { + const peer = new RpcPeer('unknown', 'host', (message, reject) => process.send(message, undefined, { + swallowErrors: !reject, + }, e => { + if (e) + reject?.(e); + })); + peer.transportSafeArgumentTypes.add(Buffer.name); + process.on('message', message => peer.handleMessage(message as RpcMessage)); + process.on('disconnect', () => { + console.error('peer host disconnected, exiting.'); + process.exit(1); + }); + + let systemManager: SystemManager; + let deviceManager: DeviceManager; + let api: PluginAPI; + let pluginId: string; + + const getConsole = (hook: (stdout: PassThrough, stderr: PassThrough) => Promise, + also?: Console, alsoPrefix?: string) => { + + const stdout = new PassThrough(); + const stderr = new PassThrough(); + + hook(stdout, stderr); + + const ret = new Console(stdout, stderr); + + const methods = [ + 'log', 'warn', + 'dir', 'time', + 'timeEnd', 'timeLog', + 'trace', 'assert', + 'clear', 'count', + 'countReset', 'group', + 'groupEnd', 'table', + 'debug', 'info', + 'dirxml', 'error', + 'groupCollapsed', + ]; + + const printers = ['log', 'info', 'debug', 'trace', 'warn', 'error']; + for (const m of methods) { + const old = (ret as any)[m].bind(ret); + (ret as any)[m] = (...args: any[]) => { + // prefer the mixin version for local/remote console dump. + if (also && alsoPrefix && printers.includes(m)) { + (also as any)[m](alsoPrefix, ...args); + } + else { + (console as any)[m](...args); + } + // call through to old method to ensure it gets written + // to log buffer. + old(...args); + } + } + + return ret; + } + + let pluginsPromise: Promise; + function getPlugins() { + if (!pluginsPromise) + pluginsPromise = api.getComponent('plugins'); + return pluginsPromise; + } + + const getDeviceConsole = (nativeId?: ScryptedNativeId) => { + // the the plugin console is simply the default console + // and gets read from stderr/stdout. + if (!nativeId) + return console; + + return getConsole(async (stdout, stderr) => { + const connect = async () => { + const plugins = await getPlugins(); + const port = await plugins.getRemoteServicePort(peer.selfName, 'console-writer'); + const socket = net.connect(port); + socket.write(nativeId + '\n'); + const writer = (data: Buffer) => { + socket.write(data); + }; + stdout.on('data', writer); + stderr.on('data', writer); + socket.on('error', () => { + stdout.removeAllListeners(); + stderr.removeAllListeners(); + stdout.pause(); + stderr.pause(); + setTimeout(connect, 10000); + }); + }; + connect(); + }, undefined, undefined); + } + + const getMixinConsole = (mixinId: string, nativeId: ScryptedNativeId) => { + return getConsole(async (stdout, stderr) => { + if (!mixinId) { + return; + } + // todo: fix this. a mixin provider can mixin another device to make it a mixin provider itself. + // so the mixin id in the mixin table will be incorrect. + // there's no easy way to fix this from the remote. + // if (!systemManager.getDeviceById(mixinId).mixins.includes(idForNativeId(nativeId))) { + // return; + // } + const reconnect = () => { + stdout.removeAllListeners(); + stderr.removeAllListeners(); + stdout.pause(); + stderr.pause(); + setTimeout(tryConnect, 10000); + }; + + const connect = async () => { + const ds = deviceManager.getDeviceState(nativeId); + if (!ds) { + // deleted? + return; + } + + const plugins = await getPlugins(); + const { pluginId, nativeId: mixinNativeId } = await plugins.getDeviceInfo(mixinId); + const port = await plugins.getRemoteServicePort(pluginId, 'console-writer'); + const socket = net.connect(port); + socket.write(mixinNativeId + '\n'); + const writer = (data: Buffer) => { + let str = data.toString().trim(); + str = str.replaceAll('\n', `\n[${ds.name}]: `); + str = `[${ds.name}]: ` + str + '\n'; + socket.write(str); + }; + stdout.on('data', writer); + stderr.on('data', writer); + socket.on('close', reconnect); + }; + + const tryConnect = async () => { + try { + await connect(); + } + catch (e) { + reconnect(); + } + } + tryConnect(); + }, getDeviceConsole(nativeId), `[${systemManager.getDeviceById(mixinId)?.name}]`); + } + + let lastCpuUsage: NodeJS.CpuUsage; + setInterval(() => { + const cpuUsage = process.cpuUsage(lastCpuUsage); + lastCpuUsage = cpuUsage; + peer.sendOob({ + type: 'stats', + cpu: cpuUsage, + memoryUsage: process.memoryUsage(), + }); + global?.gc(); + }, 10000); + + let replPort: Promise; + + let _pluginConsole: Console; + const getPluginConsole = () => { + if (!_pluginConsole) + _pluginConsole = getDeviceConsole(undefined); + return _pluginConsole; + } + + attachPluginRemote(peer, { + createMediaManager: async (sm) => { + systemManager = sm; + return new MediaManagerImpl(systemManager, getPluginConsole()); + }, + onGetRemote: async (_api, _pluginId) => { + api = _api; + pluginId = _pluginId; + peer.selfName = pluginId; + }, + onPluginReady: async (scrypted, params, plugin) => { + replPort = createREPLServer(scrypted, params, plugin); + }, + getPluginConsole, + getDeviceConsole, + getMixinConsole, + async getServicePort(name, ...args: any[]) { + if (name === 'repl') { + if (!replPort) + throw new Error('REPL unavailable: Plugin not loaded.') + return replPort; + } + throw new Error(`unknown service ${name}`); + }, + async onLoadZip(pluginReader: PluginReader, packageJson: any) { + const entry = pluginReader('main.nodejs.js.map') + const map = entry?.toString(); + + installSourceMapSupport({ + environment: 'node', + retrieveSourceMap(source) { + if (source === '/plugin/main.nodejs.js' || source === `/${pluginId}/main.nodejs.js`) { + if (!map) + return null; + return { + url: '/plugin/main.nodejs.js', + map, + } + } + return null; + } + }); + await installOptionalDependencies(getPluginConsole(), packageJson); + } + }).then(scrypted => { + systemManager = scrypted.systemManager; + deviceManager = scrypted.deviceManager; + + process.on('uncaughtException', e => { + getPluginConsole().error('uncaughtException', e); + scrypted.log.e('uncaughtException ' + e?.toString()); + }); + process.on('unhandledRejection', e => { + getPluginConsole().error('unhandledRejection', e); + scrypted.log.e('unhandledRejection ' + e?.toString()); + }); + }) +} diff --git a/server/src/scrypted-plugin-main.ts b/server/src/scrypted-plugin-main.ts index 620b343c7..4b71efd50 100644 --- a/server/src/scrypted-plugin-main.ts +++ b/server/src/scrypted-plugin-main.ts @@ -1,3 +1,3 @@ -import { startPluginRemote } from './plugin/plugin-host'; +import { startPluginRemote } from "./plugin/plugin-remote-worker"; startPluginRemote();