From 0fc120ee837ca3a3809a9d56069df696e7571929 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Sat, 5 Feb 2022 23:21:52 -0800 Subject: [PATCH] server: refactor runtimes --- server/src/plugin/plugin-host.ts | 159 +++--------------- server/src/plugin/plugin-remote-worker.ts | 39 +---- .../plugin/runtime/child-process-worker.ts | 43 +++++ server/src/plugin/runtime/node-fork-worker.ts | 46 +++++ server/src/plugin/runtime/python-worker.ts | 60 +++++++ server/src/plugin/runtime/runtime-worker.ts | 29 ++++ 6 files changed, 211 insertions(+), 165 deletions(-) create mode 100644 server/src/plugin/runtime/child-process-worker.ts create mode 100644 server/src/plugin/runtime/node-fork-worker.ts create mode 100644 server/src/plugin/runtime/python-worker.ts create mode 100644 server/src/plugin/runtime/runtime-worker.ts diff --git a/server/src/plugin/plugin-host.ts b/server/src/plugin/plugin-host.ts index ce8ea79f0..37c317140 100644 --- a/server/src/plugin/plugin-host.ts +++ b/server/src/plugin/plugin-host.ts @@ -24,12 +24,12 @@ import crypto from 'crypto'; import fs from 'fs'; import mkdirp from 'mkdirp'; import rimraf from 'rimraf'; +import { RuntimeWorker } from './runtime/runtime-worker'; +import { PythonRuntimeWorker } from './runtime/python-worker'; +import { NodeForkWorker } from './runtime/node-fork-worker'; export class PluginHost { - static sharedWorker: child_process.ChildProcess; - static sharedWorkerImmediateRestart = false; - - worker: child_process.ChildProcess; + worker: RuntimeWorker; peer: RpcPeer; pluginId: string; module: Promise; @@ -53,10 +53,7 @@ export class PluginHost { kill() { this.killed = true; this.api.removeListeners(); - // things might get a bit race prone, so clear out the shared worker before killing. - if (this.worker === PluginHost.sharedWorker) - PluginHost.sharedWorker = undefined; - this.worker.kill('SIGKILL'); + this.worker.kill(); this.io.close(); for (const s of Object.values(this.ws)) { s.close(); @@ -86,7 +83,7 @@ export class PluginHost { return pi._id; } - constructor(scrypted: ScryptedRuntime, plugin: Plugin, public pluginDebug?: PluginDebug) { + constructor(scrypted: ScryptedRuntime, plugin: Plugin, pluginDebug?: PluginDebug) { this.scrypted = scrypted; this.pluginId = plugin._id; this.pluginName = plugin.packageJson?.name; @@ -103,7 +100,7 @@ export class PluginHost { this.startPluginHost(logger, { NODE_PATH: path.join(getPluginNodePath(this.pluginId), 'node_modules'), SCRYPTED_PLUGIN_VOLUME: pluginVolume, - }, this.packageJson.scrypted.runtime); + }, this.packageJson.scrypted.runtime, pluginDebug); this.io.on('connection', async (socket) => { try { @@ -235,137 +232,33 @@ export class PluginHost { }); } - startPluginHost(logger: Logger, env?: any, runtime?: string) { + startPluginHost(logger: Logger, env?: any, runtime?: string, pluginDebug?: PluginDebug) { let connected = true; if (runtime === 'python') { - const args: string[] = [ - '-u', - ]; - if (this.pluginDebug) { - args.push( - '-m', - 'debugpy', - '--listen', - `0.0.0.0:${this.pluginDebug.inspectPort}`, - '--wait-for-client', - ) - } - args.push( - path.join(__dirname, '../../python', 'plugin-remote.py'), - ) - - this.worker = child_process.spawn('python3', args, { - // stdin, stdout, stderr, peer in, peer out - stdio: ['pipe', 'pipe', 'pipe', 'pipe', 'pipe'], - env: Object.assign({ - PYTHONPATH: path.join(process.cwd(), 'node_modules/@scrypted/types'), - }, process.env, env), - }); - - const peerin = this.worker.stdio[3] as Writable; - const peerout = this.worker.stdio[4] as Readable; - - peerin.on('error', e => connected = false); - peerout.on('error', e => connected = false); - - this.peer = new RpcPeer('host', this.pluginId, (message, reject) => { - if (connected) { - peerin.write(JSON.stringify(message) + '\n', e => e && reject?.(e)); - } - else if (reject) { - reject(new Error('peer disconnected')); - } - }); - - const readInterface = readline.createInterface({ - input: peerout, - terminal: false, - }); - readInterface.on('line', line => { - this.peer.handleMessage(JSON.parse(line)); + this.worker = new PythonRuntimeWorker(this.pluginId, { + env, + pluginDebug, }); } else { - const execArgv: string[] = process.execArgv.slice(); - if (this.pluginDebug) { - execArgv.push(`--inspect=0.0.0.0:${this.pluginDebug.inspectPort}`); - } - - const useSharedWorker = process.env.SCRYPTED_SHARED_WORKER && - this.packageJson.scrypted.sharedWorker !== false && - this.packageJson.scrypted.realfs !== true && - Object.keys(this.packageJson.optionalDependencies || {}).length === 0; - if (useSharedWorker) { - if (!PluginHost.sharedWorker) { - const worker = child_process.fork(require.main.filename, ['child', '@scrypted/shared'], { - stdio: ['pipe', 'pipe', 'pipe', 'ipc'], - env: Object.assign({}, process.env, env), - serialization: 'advanced', - execArgv, - }); - PluginHost.sharedWorker = worker; - PluginHost.sharedWorker.setMaxListeners(100); - const clearSharedWorker = () => { - if (worker === PluginHost.sharedWorker) - PluginHost.sharedWorker = undefined; - }; - PluginHost.sharedWorker.on('close', () => clearSharedWorker); - PluginHost.sharedWorker.on('error', () => clearSharedWorker); - PluginHost.sharedWorker.on('exit', () => clearSharedWorker); - PluginHost.sharedWorker.on('disconnect', () => clearSharedWorker); - } - PluginHost.sharedWorker.send({ - type: 'start', - pluginId: this.pluginId, - }); - this.worker = PluginHost.sharedWorker; - this.worker.on('message', (message: any) => { - if (message.pluginId === this.pluginId) - this.peer.handleMessage(message.message) - }); - - this.peer = new RpcPeer('host', this.pluginId, (message, reject) => { - if (connected) { - this.worker.send({ - type: 'message', - pluginId: this.pluginId, - message: message, - }, undefined, e => { - if (e && reject) - reject(e); - }); - } - else if (reject) { - reject(new Error('peer disconnected')); - } - }); - } - else { - this.worker = child_process.fork(require.main.filename, ['child', this.pluginId], { - stdio: ['pipe', 'pipe', 'pipe', 'ipc'], - env: Object.assign({}, process.env, env), - serialization: 'advanced', - execArgv, - }); - this.worker.on('message', message => this.peer.handleMessage(message as any)); - - this.peer = new RpcPeer('host', this.pluginId, (message, reject) => { - if (connected) { - this.worker.send(message, undefined, e => { - if (e && reject) - reject(e); - }); - } - else if (reject) { - reject(new Error('peer disconnected')); - } - }); - } - - this.peer.transportSafeArgumentTypes.add(Buffer.name); + this.worker = new NodeForkWorker(this.pluginId, { + env: Object.assign({}, process.env, env), + pluginDebug, + }); } + this.peer = new RpcPeer('host', this.pluginId, (message, reject) => { + if (connected) { + this.worker.send(message, reject); + } + else if (reject) { + reject(new Error('peer disconnected')); + } + }); + + this.worker.setupRpcPeer(this.peer); + this.worker.stdout.on('data', data => console.log(data.toString())); this.worker.stderr.on('data', data => console.error(data.toString())); this.consoleServer = createConsoleServer(this.worker.stdout, this.worker.stderr); diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index 2d4bc94a0..c2d754a76 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -10,42 +10,17 @@ import net from 'net' import { installOptionalDependencies } from './plugin-npm-dependencies'; import { createREPLServer } from './plugin-repl'; -export function startSharedPluginRemote() { - process.setMaxListeners(100); - process.on('message', (message: any) => { - if (message.type === 'start') - startPluginRemote(message.pluginId) - }); -} - export function startPluginRemote(pluginId: string) { let peerSend: (message: RpcMessage, reject?: (e: Error) => void) => void; let peerListener: NodeJS.MessageListener; - if (process.argv[3] === '@scrypted/shared') { - peerSend = (message, reject) => process.send({ - pluginId, - message, - }, undefined, { - swallowErrors: !reject, - }, e => { - if (e) - reject?.(e); - }); - peerListener = (message: any) => { - if (message.type === 'message' && message.pluginId === pluginId) - peer.handleMessage(message.message); - } - } - else { - peerSend = (message, reject) => process.send(message, undefined, { - swallowErrors: !reject, - }, e => { - if (e) - reject?.(e); - }); - peerListener = message => peer.handleMessage(message as RpcMessage); - } + peerSend = (message, reject) => process.send(message, undefined, { + swallowErrors: !reject, + }, e => { + if (e) + reject?.(e); + }); + peerListener = message => peer.handleMessage(message as RpcMessage); const peer = new RpcPeer('unknown', 'host', peerSend); peer.transportSafeArgumentTypes.add(Buffer.name); diff --git a/server/src/plugin/runtime/child-process-worker.ts b/server/src/plugin/runtime/child-process-worker.ts new file mode 100644 index 000000000..f48ae687b --- /dev/null +++ b/server/src/plugin/runtime/child-process-worker.ts @@ -0,0 +1,43 @@ +import { EventEmitter } from "ws"; +import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; +import child_process from 'child_process'; +import { RpcMessage, RpcPeer } from "../../rpc"; + +export abstract class ChildProcessWorker extends EventEmitter implements RuntimeWorker { + worker: child_process.ChildProcess; + + constructor(public pluginId: string, options: RuntimeWorkerOptions) { + super(); + } + + setupWorker() { + this.worker.on('close', () => this.emit('close')); + this.worker.on('disconnect', () => this.emit('disconnect')); + this.worker.on('exit', (code, signal) => this.emit('exit', code, signal)); + this.worker.on('close', () => this.emit('close')); + this.worker.on('error', e => this.emit('error', e)); + } + + get pid() { + return this.worker.pid; + } + + get stdout() { + return this.worker.stdout; + } + + get stderr() { + return this.worker.stderr; + } + + get killed() { + return this.worker.killed; + } + + kill(): void { + this.worker.kill('SIGKILL'); + } + + abstract send(message: RpcMessage, reject?: (e: Error) => void): void; + abstract setupRpcPeer(peer: RpcPeer): void; +} diff --git a/server/src/plugin/runtime/node-fork-worker.ts b/server/src/plugin/runtime/node-fork-worker.ts new file mode 100644 index 000000000..a47516fca --- /dev/null +++ b/server/src/plugin/runtime/node-fork-worker.ts @@ -0,0 +1,46 @@ +import { RuntimeWorker, RuntimeWorkerOptions as RuntimeWorkerOptions } from "./runtime-worker"; +import child_process from 'child_process'; +import path from 'path'; +import { EventEmitter } from "ws"; +import { RpcMessage, RpcPeer } from "../../rpc"; +import { ChildProcessWorker } from "./child-process-worker"; + +export class NodeForkWorker extends ChildProcessWorker { + + constructor(pluginId: string, options: RuntimeWorkerOptions) { + super(pluginId, options); + + const {env, pluginDebug} = options; + + const execArgv: string[] = process.execArgv.slice(); + if (pluginDebug) { + execArgv.push(`--inspect=0.0.0.0:${pluginDebug.inspectPort}`); + } + + this.worker = child_process.fork(require.main.filename, ['child', this.pluginId], { + stdio: ['pipe', 'pipe', 'pipe', 'ipc'], + env: Object.assign({}, process.env, env), + serialization: 'advanced', + execArgv, + }); + this.worker.on('message', message => this.emit('message', message)); + + this.setupWorker(); + } + + setupRpcPeer(peer: RpcPeer): void { + this.worker.on('message', message => peer.handleMessage(message as any)); + peer.transportSafeArgumentTypes.add(Buffer.name); + } + + send(message: RpcMessage, reject?: (e: Error) => void): void { + this.worker.send(message, undefined, e => { + if (e && reject) + reject(e); + }); + } + + get pid() { + return this.worker.pid; + } +} diff --git a/server/src/plugin/runtime/python-worker.ts b/server/src/plugin/runtime/python-worker.ts new file mode 100644 index 000000000..87cd10070 --- /dev/null +++ b/server/src/plugin/runtime/python-worker.ts @@ -0,0 +1,60 @@ +import { RuntimeWorker, RuntimeWorkerOptions as RuntimeWorkerOptions } from "./runtime-worker"; +import child_process from 'child_process'; +import path from 'path'; +import { EventEmitter } from "ws"; +import { Writable, Readable } from 'stream'; +import { RpcMessage, RpcPeer } from "../../rpc"; +import readline from 'readline'; +import { ChildProcessWorker } from "./child-process-worker"; + +export class PythonRuntimeWorker extends ChildProcessWorker { + + constructor(pluginId: string, options: RuntimeWorkerOptions) { + super(pluginId, options); + + const { env, pluginDebug } = options; + const args: string[] = [ + '-u', + ]; + if (pluginDebug) { + args.push( + '-m', + 'debugpy', + '--listen', + `0.0.0.0:${pluginDebug.inspectPort}`, + '--wait-for-client', + ) + } + args.push( + path.join(__dirname, '../../../python', 'plugin-remote.py'), + ) + + this.worker = child_process.spawn('python3', args, { + // stdin, stdout, stderr, peer in, peer out + stdio: ['pipe', 'pipe', 'pipe', 'pipe', 'pipe'], + env: Object.assign({ + PYTHONPATH: path.join(process.cwd(), 'node_modules/@scrypted/types'), + }, process.env, env), + }); + + this.setupWorker(); + } + + setupRpcPeer(peer: RpcPeer): void { + const peerin = this.worker.stdio[3] as Writable; + const peerout = this.worker.stdio[4] as Readable; + + peerin.on('error', e => this.emit('error', e)); + peerout.on('error', e => this.emit('error', e)); + + const readInterface = readline.createInterface({ + input: peerout, + terminal: false, + }); + readInterface.on('line', line => peer.handleMessage(JSON.parse(line))); + } + + send(message: RpcMessage, reject?: (e: Error) => void): void { + (this.worker.stdio[3] as Writable).write(JSON.stringify(message) + '\n', e => e && reject?.(e)); + } +} diff --git a/server/src/plugin/runtime/runtime-worker.ts b/server/src/plugin/runtime/runtime-worker.ts new file mode 100644 index 000000000..fdbc5c389 --- /dev/null +++ b/server/src/plugin/runtime/runtime-worker.ts @@ -0,0 +1,29 @@ +import { RpcMessage, RpcPeer } from "../../rpc"; +import { PluginDebug } from "../plugin-debug"; +import {Readable} from "stream"; + +export interface RuntimeWorkerOptions { + pluginDebug: PluginDebug; + env: any; +} + +export interface RuntimeWorker { + pid: number; + stdout: Readable; + stderr: Readable; + killed: boolean; + + kill(): void; + + on(event: 'error', listener: (err: Error) => void): this; + on(event: 'error', listener: (err: Error) => void): this; + on(event: 'close', listener: (code: number | null, signal: NodeJS.Signals | null) => void): this; + on(event: 'disconnect', listener: () => void): this; + on(event: 'exit', listener: (code: number | null, signal: NodeJS.Signals | null) => void): this; + once(event: 'exit', listener: (code: number | null, signal: NodeJS.Signals | null) => void): this; + + send(message: RpcMessage, reject?: (e: Error) => void): void; + + setupRpcPeer(peer: RpcPeer): void; +} +