server: new shared worker mode based on threads

This commit is contained in:
Koushik Dutta
2022-02-06 01:01:31 -08:00
parent 122d1b5830
commit 3027e7d051
12 changed files with 1906 additions and 1236 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -33,7 +33,7 @@
},
"dependencies": {
"@koush/wrtc": "^0.5.2",
"axios": "^0.21.1",
"axios": "^0.25.0",
"bpmux": "^8.1.3",
"debug": "^4.3.1",
"http-proxy": "^1.18.1",

View File

@@ -42,9 +42,9 @@ class ScryptedPush extends ScryptedDeviceBase implements BufferConverter {
}
async convert(data: Buffer | string, fromMimeType: string): Promise<Buffer | string> {
async convert(data: Buffer | string, fromMimeType: string): Promise<Buffer> {
if (this.cloud.storage.getItem('hostname')) {
return `https://${this.cloud.getHostname()}${await this.cloud.getCloudMessagePath()}/${data}`;
return Buffer.from(`https://${this.cloud.getHostname()}${await this.cloud.getCloudMessagePath()}/${data}`);
}
const url = `http://localhost/push/${data}`;
@@ -59,11 +59,11 @@ class ScryptedCloud extends ScryptedDeviceBase implements OauthClient, Settings,
push: ScryptedPush;
cloudMessagePath: Promise<string>;
async whitelist(localUrl: string, ttl: number, baseUrl: string): Promise<Buffer | string> {
async whitelist(localUrl: string, ttl: number, baseUrl: string): Promise<Buffer> {
const local = Url.parse(localUrl);
if (this.storage.getItem('hostname')) {
return `${baseUrl}${local.path}`;
return Buffer.from(`${baseUrl}${local.path}`);
}
const token_info = this.storage.getItem('token_info');
@@ -84,7 +84,7 @@ class ScryptedCloud extends ScryptedDeviceBase implements OauthClient, Settings,
})
const url = `${baseUrl}${local.path}?${tokens}`;
return url;
return Buffer.from(url);
}
@@ -130,7 +130,7 @@ class ScryptedCloud extends ScryptedDeviceBase implements OauthClient, Settings,
return hostname;
}
async convert(data: Buffer | string, fromMimeType: string): Promise<Buffer | string> {
async convert(data: Buffer, fromMimeType: string): Promise<Buffer> {
return this.whitelist(data.toString(), 10 * 365 * 24 * 60 * 60 * 1000, `https://${this.getHostname()}`);
}

View File

@@ -1,4 +1,4 @@
import sdk, { ScryptedDeviceBase, DeviceProvider, Settings, Setting, VideoCamera, MediaObject, MotionSensor, ScryptedInterface, Camera, MediaStreamOptions, Intercom, ScryptedMimeTypes, FFMpegInput, ObjectDetector, PictureOptions, ObjectDetectionTypes, ObjectsDetected, Notifier, SCRYPTED_MEDIA_SCHEME, VideoCameraConfiguration, OnOff } from "@scrypted/sdk";
import sdk, { ScryptedDeviceBase, DeviceProvider, Settings, Setting, VideoCamera, MediaObject, MotionSensor, ScryptedInterface, Camera, MediaStreamOptions, Intercom, ScryptedMimeTypes, FFMpegInput, ObjectDetector, PictureOptions, ObjectDetectionTypes, ObjectsDetected, Notifier, VideoCameraConfiguration, OnOff } from "@scrypted/sdk";
import { ProtectCameraChannelConfig, ProtectCameraConfig, ProtectCameraConfigInterface, ProtectCameraLcdMessagePayload } from "@koush/unifi-protect";
import child_process, { ChildProcess } from 'child_process';
import { ffmpegLogInitialOutput } from '../../../common/src/media-helpers';
@@ -393,7 +393,7 @@ export class UnifiCamera extends ScryptedDeviceBase implements Notifier, Interco
lcdMessage: payload,
})
if (typeof media === 'string' && media.startsWith(SCRYPTED_MEDIA_SCHEME)) {
if (typeof media === 'string') {
media = await mediaManager.createMediaObjectFromUrl(media);
}
if (media) {

File diff suppressed because it is too large Load Diff

View File

@@ -39,6 +39,6 @@
"@types/node": "^16.7.1"
},
"optionalDependencies": {
"zwave-js": "^8.5.1"
"zwave-js": "^8.11.3"
}
}

View File

@@ -27,6 +27,7 @@ import rimraf from 'rimraf';
import { RuntimeWorker } from './runtime/runtime-worker';
import { PythonRuntimeWorker } from './runtime/python-worker';
import { NodeForkWorker } from './runtime/node-fork-worker';
import { NodeThreadWorker } from './runtime/node-thread-worker';
export class PluginHost {
worker: RuntimeWorker;
@@ -98,9 +99,8 @@ export class PluginHost {
const pluginVolume = ensurePluginVolume(this.pluginId);
this.startPluginHost(logger, {
NODE_PATH: path.join(getPluginNodePath(this.pluginId), 'node_modules'),
SCRYPTED_PLUGIN_VOLUME: pluginVolume,
}, this.packageJson.scrypted.runtime, pluginDebug);
}, pluginDebug);
this.io.on('connection', async (socket) => {
try {
@@ -232,20 +232,28 @@ export class PluginHost {
});
}
startPluginHost(logger: Logger, env?: any, runtime?: string, pluginDebug?: PluginDebug) {
startPluginHost(logger: Logger, env: any, pluginDebug: PluginDebug) {
let connected = true;
if (runtime === 'python') {
if (this.packageJson.scrypted.runtime === 'python') {
this.worker = new PythonRuntimeWorker(this.pluginId, {
env,
pluginDebug,
});
}
else {
this.worker = new NodeForkWorker(this.pluginId, {
env: Object.assign({}, process.env, env),
pluginDebug,
});
if (process.env.SCRYPTED_SHARED_WORKER && this.packageJson.optionalDependencies && Object.keys(this.packageJson.optionalDependencies).length) {
this.worker = new NodeForkWorker(this.pluginId, {
env,
pluginDebug,
});
}
else {
this.worker = new NodeThreadWorker(this.pluginId, {
env,
pluginDebug,
});
}
}
this.peer = new RpcPeer('host', this.pluginId, (message, reject) => {

View File

@@ -10,25 +10,8 @@ import net from 'net'
import { installOptionalDependencies } from './plugin-npm-dependencies';
import { createREPLServer } from './plugin-repl';
export function startPluginRemote(pluginId: string) {
let peerSend: (message: RpcMessage, reject?: (e: Error) => void) => void;
let peerListener: NodeJS.MessageListener;
peerSend = (message, reject) => process.send(message, undefined, {
swallowErrors: !reject,
}, e => {
if (e)
reject?.(e);
});
peerListener = message => peer.handleMessage(message as RpcMessage);
export function startPluginRemote(pluginId: string, peerSend: (message: RpcMessage, reject?: (e: Error) => void) => void) {
const peer = new RpcPeer('unknown', 'host', peerSend);
peer.transportSafeArgumentTypes.add(Buffer.name);
process.on('message', peerListener);
process.on('disconnect', () => {
console.error('peer host disconnected, exiting.');
process.exit(1);
});
let systemManager: SystemManager;
let deviceManager: DeviceManager;
@@ -234,6 +217,9 @@ export function startPluginRemote(pluginId: string) {
systemManager = scrypted.systemManager;
deviceManager = scrypted.deviceManager;
process.removeAllListeners('uncaughtException');
process.removeAllListeners('unhandledRejection');
process.on('uncaughtException', e => {
getPluginConsole().error('uncaughtException', e);
scrypted.log.e('uncaughtException ' + e?.toString());
@@ -242,5 +228,7 @@ export function startPluginRemote(pluginId: string) {
getPluginConsole().error('unhandledRejection', e);
scrypted.log.e('unhandledRejection ' + e?.toString());
});
})
});
return peer;
}

View File

@@ -1,9 +1,9 @@
import { RuntimeWorker, RuntimeWorkerOptions as RuntimeWorkerOptions } from "./runtime-worker";
import { RuntimeWorkerOptions as RuntimeWorkerOptions } from "./runtime-worker";
import child_process from 'child_process';
import path from 'path';
import { EventEmitter } from "ws";
import { RpcMessage, RpcPeer } from "../../rpc";
import { ChildProcessWorker } from "./child-process-worker";
import { getPluginNodePath } from "../plugin-npm-dependencies";
export class NodeForkWorker extends ChildProcessWorker {
@@ -19,7 +19,9 @@ export class NodeForkWorker extends ChildProcessWorker {
this.worker = child_process.fork(require.main.filename, ['child', this.pluginId], {
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
env: Object.assign({}, process.env, env),
env: Object.assign({}, process.env, env, {
NODE_PATH: path.join(getPluginNodePath(this.pluginId), 'node_modules'),
}),
serialization: 'advanced',
execArgv,
});

View File

@@ -3,9 +3,11 @@ import { EventEmitter } from "ws";
import { RpcMessage, RpcPeer } from "../../rpc";
import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker";
import worker_threads from "worker_threads";
import path from 'path';
import { getPluginNodePath } from "../plugin-npm-dependencies";
import v8 from 'v8';
export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
pid: number;
terminated: boolean;
worker: worker_threads.Worker;
@@ -15,15 +17,25 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
this.worker = new worker_threads.Worker(require.main.filename, {
argv: ['child-thread', this.pluginId],
env: Object.assign({}, process.env, env),
env: Object.assign({}, process.env, env, {
NODE_PATH: path.join(getPluginNodePath(this.pluginId), 'node_modules'),
}),
});
this.worker.on('exit', () => {
this.terminated = true;
this.emit('exit');
});
this.worker.on('error', e => this.emit('error', e));
this.worker.on('messageerror', e => this.emit('error', e));
this.worker.on('error', e => {
this.emit('error', e);
});
this.worker.on('messageerror', e => {
this.emit('error', e);
});
}
get pid() {
return this.worker.threadId;
}
get stdout() {
@@ -44,7 +56,7 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
send(message: RpcMessage, reject?: (e: Error) => void): void {
try {
this.worker.postMessage(message)
this.worker.postMessage(v8.serialize(message));
}
catch (e) {
reject?.(e);
@@ -52,6 +64,6 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
}
setupRpcPeer(peer: RpcPeer): void {
this.worker.on('message', message => peer.handleMessage(message));
this.worker.on('message', message => peer.handleMessage(v8.deserialize(message)));
}
}

View File

@@ -9,16 +9,24 @@ if (!semver.gte(process.version, '16.0.0')) {
startPeriodicGarbageCollection();
process.on('unhandledRejection', error => {
if (error?.constructor !== RPCResultError && error?.constructor !== PluginError) {
throw error;
}
console.warn('unhandled rejection of RPC Result', error);
});
if (process.argv[2] === 'child' || process.argv[2] === 'child-thread') {
process.on('uncaughtException', e => {
console.error('uncaughtException', e);
});
process.on('unhandledRejection', e => {
console.error('unhandledRejection', e);
});
if (process.argv[2] === 'child') {
require('./scrypted-plugin-main');
}
else {
process.on('unhandledRejection', error => {
if (error?.constructor !== RPCResultError && error?.constructor !== PluginError) {
console.error('wtf', error);
throw error;
}
console.warn('unhandled rejection of RPC Result', error);
});
require('./scrypted-server-main');
}

View File

@@ -1,8 +1,32 @@
import { startPluginRemote } from "./plugin/plugin-remote-worker";
import { RpcMessage } from "./rpc";
import worker_threads from "worker_threads";
import v8 from 'v8';
if (process.argv[2] === 'child-thread') {
const peer = startPluginRemote(process.argv[3], (message, reject) => {
try {
worker_threads.parentPort.postMessage(v8.serialize(message));
}
catch (e) {
reject?.(e);
}
});
peer.transportSafeArgumentTypes.add(Buffer.name);
worker_threads.parentPort.on('message', message => peer.handleMessage(v8.deserialize(message)));
}
else {
startPluginRemote(process.argv[3]);
}
const peer = startPluginRemote(process.argv[3], (message, reject) => process.send(message, undefined, {
swallowErrors: !reject,
}, e => {
if (e)
reject?.(e);
}));
peer.transportSafeArgumentTypes.add(Buffer.name);
process.on('message', message=> peer.handleMessage(message as RpcMessage));
process.on('disconnect', () => {
console.error('peer host disconnected, exiting.');
process.exit(1);
});
}