server: rpc/threading

This commit is contained in:
Koushik Dutta
2022-02-12 19:09:42 -08:00
parent a882bb8e80
commit 44b4fa2bd4
18 changed files with 294 additions and 103 deletions

1
common/fs/sdk/index.d.ts vendored Symbolic link
View File

@@ -0,0 +1 @@
../../../sdk/index.d.ts

1
common/fs/sdk/types.d.ts vendored Symbolic link
View File

@@ -0,0 +1 @@
../../../sdk/types/index.d.ts

View File

@@ -1,4 +1,5 @@
import { Readable } from "stream";
import { Readable } from 'stream';
import { once } from 'events';
export async function readLength(readable: Readable, length: number): Promise<Buffer> {
if (!length) {
@@ -55,3 +56,14 @@ export async function readUntil(readable: Readable, charCode: number) {
export async function readLine(readable: Readable) {
return readUntil(readable, CHARCODE_NEWLINE);
}
export async function readString(readable: Readable | Promise<Readable>) {
let data = '';
readable = await readable;
readable.on('data', buffer => {
data += buffer.toString();
});
readable.resume();
await once(readable, 'end')
return data;
}

View File

@@ -101,6 +101,10 @@ export class RtspServer {
return this.handleSetup();
}
async handleTeardown() {
return this.handleSetup();
}
async *handleRecord(): AsyncGenerator<{
type: 'audio' | 'video',
rtcp: boolean,
@@ -243,7 +247,7 @@ export class RtspServer {
}
await this[method](url, requestHeaders);
return method !== 'play' && method !== 'record';
return method !== 'play' && method !== 'record' && method !== 'teardown';
}
respond(code: number, message: string, requestHeaders: Headers, headers: Headers, buffer?: Buffer) {

View File

@@ -1,6 +1,8 @@
import type { TranspileOptions } from "typescript";
import sdk, { ScryptedDeviceBase, ScryptedInterface, ScryptedDeviceType } from "@scrypted/sdk";
import vm from "vm";
import fs from 'fs';
import { newThread } from '../../server/src/threading';
const { systemManager, deviceManager, mediaManager, endpointManager } = sdk;
@@ -20,16 +22,43 @@ function tsCompile(source: string, options: TranspileOptions = null): string {
return ts.transpileModule(source, options).outputText;
}
const scryptedTypesDefs = require('!!raw-loader!@scrypted/sdk/types/index.d.ts').default;
const scryptedIndexDefs = require('!!raw-loader!@scrypted/sdk/index.d.ts').default;
async function tsCompileThread(source: string, options: TranspileOptions = null): Promise<string> {
return newThread({
source, options,
customRequire: '__webpack_require__',
}, ({ source, options }) => {
const ts = global.require("typescript");
const { ScriptTarget } = ts;
// Default options -- you could also perform a merge, or use the project tsconfig.json
if (null === options) {
options = {
compilerOptions: {
target: ScriptTarget.ESNext,
module: ts.ModuleKind.CommonJS
}
};
}
return ts.transpileModule(source, options).outputText;
});
}
function getTypeDefs() {
const scryptedTypesDefs = fs.readFileSync('sdk/types.d.ts').toString();
const scryptedIndexDefs = fs.readFileSync('sdk/index.d.ts').toString();
return {
scryptedIndexDefs,
scryptedTypesDefs,
};
}
export async function scryptedEval(device: ScryptedDeviceBase, script: string, extraLibs: { [lib: string]: string }, params: { [name: string]: any }) {
try {
const libs = Object.assign({
types: scryptedTypesDefs,
types: getTypeDefs().scryptedTypesDefs,
}, extraLibs);
const allScripts = Object.values(libs).join('\n').toString() + script;
const compiled = tsCompile(allScripts);
const compiled = await tsCompileThread(allScripts);
const allParams = Object.assign({}, params, {
systemManager,
@@ -74,10 +103,7 @@ export async function scryptedEval(device: ScryptedDeviceBase, script: string, e
}
export function createMonacoEvalDefaults(extraLibs: { [lib: string]: string }) {
const libs = Object.assign({
types: scryptedTypesDefs,
sdk: scryptedIndexDefs,
}, extraLibs);
const libs = Object.assign(getTypeDefs(), extraLibs);
function monacoEvalDefaultsFunction(monaco, libs) {
monaco.languages.typescript.typescriptDefaults.setDiagnosticsOptions(

View File

@@ -24,6 +24,7 @@
"linkfs": "^2.1.0",
"lodash": "^4.17.21",
"memfs": "^3.2.2",
"mime": "^3.0.0",
"mime-db": "^1.51.0",
"mkdirp": "^1.0.4",
"nan": "^2.15.0",
@@ -1860,14 +1861,14 @@
}
},
"node_modules/mime": {
"version": "1.6.0",
"resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz",
"integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==",
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/mime/-/mime-3.0.0.tgz",
"integrity": "sha512-jSCU7/VB1loIWBZe14aEYHU/+1UMEHoaO7qxCOVJOw9GgH72VAWppxNcjU+x9a2k3GSIBXNKxXQFqRvvZ7vr3A==",
"bin": {
"mime": "cli.js"
},
"engines": {
"node": ">=4"
"node": ">=10.0.0"
}
},
"node_modules/mime-db": {
@@ -2675,6 +2676,17 @@
"resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz",
"integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g="
},
"node_modules/send/node_modules/mime": {
"version": "1.6.0",
"resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz",
"integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==",
"bin": {
"mime": "cli.js"
},
"engines": {
"node": ">=4"
}
},
"node_modules/send/node_modules/ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",
@@ -4692,9 +4704,9 @@
"integrity": "sha1-VSmk1nZUE07cxSZmVoNbD4Ua/O4="
},
"mime": {
"version": "1.6.0",
"resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz",
"integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg=="
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/mime/-/mime-3.0.0.tgz",
"integrity": "sha512-jSCU7/VB1loIWBZe14aEYHU/+1UMEHoaO7qxCOVJOw9GgH72VAWppxNcjU+x9a2k3GSIBXNKxXQFqRvvZ7vr3A=="
},
"mime-db": {
"version": "1.51.0",
@@ -5290,6 +5302,11 @@
}
}
},
"mime": {
"version": "1.6.0",
"resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz",
"integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg=="
},
"ms": {
"version": "2.1.3",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz",

View File

@@ -18,6 +18,7 @@
"linkfs": "^2.1.0",
"lodash": "^4.17.21",
"memfs": "^3.2.2",
"mime": "^3.0.0",
"mime-db": "^1.51.0",
"mkdirp": "^1.0.4",
"nan": "^2.15.0",

View File

@@ -1,13 +1,13 @@
import { HttpResponse, HttpResponseOptions } from "@scrypted/types";
import { Response } from "express";
import mime from "mime";
import { PROPERTY_PROXY_ONEWAY_METHODS } from "./rpc";
import { RpcPeer } from "./rpc";
import { join as pathJoin } from 'path';
import fs from 'fs';
export function createResponseInterface(res: Response, unzippedDir: string): HttpResponse {
class HttpResponseImpl implements HttpResponse {
[PROPERTY_PROXY_ONEWAY_METHODS] = [
[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] = [
'send',
'sendFile',
];

View File

@@ -10,9 +10,7 @@ export interface ConsoleServer {
pluginConsole: Console;
readPort: number,
writePort: number,
readServer: net.Server,
writeServer: net.Server,
sockets: Set<net.Socket>;
destroy(): void;
}
export interface StdPassThroughs {
@@ -129,11 +127,22 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr:
const writePort = await listenZero(writeServer);
return {
destroy() {
for (const socket of sockets) {
socket.destroy();
}
sockets.clear();
outputs.clear();
try {
readServer.close();
writeServer.close();
}
catch (e) {
}
},
pluginConsole,
readPort,
writePort,
readServer,
writeServer,
sockets,
};
}

View File

@@ -2,7 +2,7 @@ import { DeviceProvider, EventDetails, EventListenerOptions, EventListenerRegist
import { ScryptedRuntime } from "../runtime";
import { PluginDevice } from "../db-types";
import { MixinProvider } from "@scrypted/types";
import { handleFunctionInvocations, PrimitiveProxyHandler } from "../rpc";
import { RpcPeer, PrimitiveProxyHandler } from "../rpc";
import { getState } from "../state";
import { getDisplayType } from "../infer-defaults";
import { allInterfaceProperties, isValidInterfaceMethod, methodInterfaces } from "./descriptor";
@@ -285,7 +285,7 @@ export class PluginDeviceProxyHandler implements PrimitiveProxyHandler<any>, Scr
get(target: any, p: PropertyKey, receiver: any): any {
if (p === 'constructor')
return;
const handled = handleFunctionInvocations(this, target, p, receiver);
const handled = RpcPeer.handleFunctionInvocations(this, target, p, receiver);
if (handled)
return handled;
const pluginDevice = this.scrypted.findPluginDeviceById(this.id);

View File

@@ -6,12 +6,12 @@ import { Logger } from '../logger';
import { getState } from '../state';
import { PluginHost } from './plugin-host';
import debounce from 'lodash/debounce';
import { PROPERTY_PROXY_ONEWAY_METHODS } from '../rpc';
import { RpcPeer } from '../rpc';
export class PluginHostAPI extends PluginAPIManagedListeners implements PluginAPI {
pluginId: string;
[PROPERTY_PROXY_ONEWAY_METHODS] = [
[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] = [
'onMixinEvent',
'onDeviceEvent',
'setStorage',

View File

@@ -50,6 +50,7 @@ export class PluginHost {
kill() {
this.killed = true;
this.api.removeListeners();
this.peer.kill('plugin killed');
this.worker.kill();
this.io.close();
for (const s of Object.values(this.ws)) {
@@ -60,14 +61,7 @@ export class PluginHost {
const deviceIds = new Set<string>(Object.values(this.scrypted.pluginDevices).filter(d => d.pluginId === this.pluginId).map(d => d._id));
this.scrypted.invalidateMixins(deviceIds);
this.consoleServer?.then(server => {
server.readServer.close();
server.writeServer.close();
for (const s of server.sockets) {
s.destroy();
}
});
setTimeout(() => this.peer.kill('plugin killed'), 500);
this.consoleServer.then(server => server.destroy());
}
toString() {
@@ -272,21 +266,26 @@ export class PluginHost {
pluginConsole.log('starting plugin', this.pluginId, this.packageJson.version);
});
this.worker.on('close', () => {
const disconnect = () => {
connected = false;
this.peer.kill('plugin disconnected');
};
this.worker.on('close', () => {
logger.log('e', `${this.pluginName} close`);
disconnect();
});
this.worker.on('disconnect', () => {
connected = false;
logger.log('e', `${this.pluginName} disconnected`);
disconnect();
});
this.worker.on('exit', async (code, signal) => {
connected = false;
logger.log('e', `${this.pluginName} exited ${code} ${signal}`);
disconnect();
});
this.worker.on('error', e => {
connected = false;
logger.log('e', `${this.pluginName} error ${e}`);
disconnect();
});
this.peer.onOob = (oob: any) => {

View File

@@ -4,7 +4,7 @@ import path from 'path';
import { ScryptedNativeId, DeviceManager, Logger, Device, DeviceManifest, DeviceState, EndpointManager, SystemDeviceState, ScryptedStatic, SystemManager, MediaManager, ScryptedMimeTypes, ScryptedInterface, ScryptedInterfaceProperty, HttpRequest } from '@scrypted/types'
import { PluginAPI, PluginLogger, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api';
import { SystemManagerImpl } from './system';
import { RpcPeer, RPCResultError, PROPERTY_PROXY_ONEWAY_METHODS, PROPERTY_JSON_DISABLE_SERIALIZATION } from '../rpc';
import { RpcPeer, RPCResultError } from '../rpc';
import { BufferSerializer } from './buffer-serializer';
import { createWebSocketClass, WebSocketConnectCallbacks, WebSocketMethods } from './plugin-remote-websocket';
import fs from 'fs';
@@ -361,9 +361,9 @@ export function attachPluginRemote(peer: RpcPeer, options?: PluginRemoteAttachOp
const localStorage = new StorageImpl(deviceManager, undefined);
const remote: PluginRemote & { [PROPERTY_JSON_DISABLE_SERIALIZATION]: boolean, [PROPERTY_PROXY_ONEWAY_METHODS]: string[] } = {
[PROPERTY_JSON_DISABLE_SERIALIZATION]: true,
[PROPERTY_PROXY_ONEWAY_METHODS]: [
const remote: PluginRemote & { [RpcPeer.PROPERTY_JSON_DISABLE_SERIALIZATION]: boolean, [RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS]: string[] } = {
[RpcPeer.PROPERTY_JSON_DISABLE_SERIALIZATION]: true,
[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS]: [
'notify',
'updateDeviceState',
'setSystemState',

View File

@@ -1,4 +1,3 @@
import { Readable } from "stream";
import { EventEmitter } from "ws";
import { RpcMessage, RpcPeer } from "../../rpc";
import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker";

View File

@@ -15,7 +15,6 @@ export interface RuntimeWorker {
kill(): void;
on(event: 'error', listener: (err: Error) => void): this;
on(event: 'error', listener: (err: Error) => void): this;
on(event: 'close', listener: (code: number | null, signal: NodeJS.Signals | null) => void): this;
on(event: 'disconnect', listener: () => void): this;

View File

@@ -1,6 +1,6 @@
import { EventListenerOptions, EventDetails, EventListenerRegister, ScryptedDevice, ScryptedInterface, ScryptedInterfaceDescriptors, SystemDeviceState, SystemManager, ScryptedInterfaceProperty, ScryptedDeviceType, Logger } from "@scrypted/types";
import { PluginAPI } from "./plugin-api";
import { handleFunctionInvocations, PrimitiveProxyHandler, PROPERTY_PROXY_ONEWAY_METHODS } from '../rpc';
import { PrimitiveProxyHandler, RpcPeer } from '../rpc';
import { EventRegistry } from "../event-registry";
import { allInterfaceProperties, isValidInterfaceMethod } from "./descriptor";
@@ -24,7 +24,7 @@ class DeviceProxyHandler implements PrimitiveProxyHandler<any>, ScryptedDevice {
if (p === 'id')
return this.id;
const handled = handleFunctionInvocations(this, target, p, receiver);
const handled = RpcPeer.handleFunctionInvocations(this, target, p, receiver);
if (handled)
return handled;
@@ -96,10 +96,10 @@ class EventListenerRegisterImpl implements EventListenerRegister {
function makeOneWayCallback<T>(input: T): T {
const f: any = input;
const oneways: string[] = f[PROPERTY_PROXY_ONEWAY_METHODS] || [];
const oneways: string[] = f[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] || [];
if (!oneways.includes(null))
oneways.push(null);
f[PROPERTY_PROXY_ONEWAY_METHODS] = oneways;
f[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] = oneways;
return input;
}

View File

@@ -1,17 +1,5 @@
import vm from 'vm';
const finalizerIdSymbol = Symbol('rpcFinalizerId');
function getDefaultTransportSafeArgumentTypes() {
const jsonSerializable = new Set<string>();
jsonSerializable.add(Number.name);
jsonSerializable.add(String.name);
jsonSerializable.add(Object.name);
jsonSerializable.add(Boolean.name);
jsonSerializable.add(Array.name);
return jsonSerializable;
}
export function startPeriodicGarbageCollection() {
if (!global.gc) {
console.warn('rpc peer garbage collection not available: global.gc is not exposed.');
@@ -81,29 +69,6 @@ interface Deferred {
reject: any;
}
export function handleFunctionInvocations(thiz: PrimitiveProxyHandler<any>, target: any, p: PropertyKey, receiver: any): any {
if (p === 'apply') {
return (thisArg: any, args: any[]) => {
return thiz.apply!(target, thiz, args);
}
}
else if (p === 'call') {
return (thisArg: any, ...args: any[]) => {
return thiz.apply!(target, thiz, args);
}
}
else if (p === 'toString' || p === Symbol.toPrimitive) {
return (thisArg: any, ...args: any[]) => {
return thiz.toPrimitive();
}
}
}
export const PROPERTY_PROXY_ONEWAY_METHODS = '__proxy_oneway_methods';
export const PROPERTY_JSON_DISABLE_SERIALIZATION = '__json_disable_serialization';
export const PROPERTY_PROXY_PROPERTIES = '__proxy_props';
export const PROPERTY_JSON_COPY_SERIALIZE_CHILDREN = '__json_copy_serialize_children';
export interface PrimitiveProxyHandler<T extends object> extends ProxyHandler<T> {
toPrimitive(): any;
}
@@ -128,11 +93,11 @@ class RpcProxy implements PrimitiveProxyHandler<any> {
return this.constructorName;
if (p === '__proxy_peer')
return this.peer;
if (p === PROPERTY_PROXY_PROPERTIES)
if (p === RpcPeer.PROPERTY_PROXY_PROPERTIES)
return this.proxyProps;
if (p === PROPERTY_PROXY_ONEWAY_METHODS)
if (p === RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS)
return this.proxyOneWayMethods;
if (p === PROPERTY_JSON_DISABLE_SERIALIZATION || p === PROPERTY_JSON_COPY_SERIALIZE_CHILDREN)
if (p === RpcPeer.PROPERTY_JSON_DISABLE_SERIALIZATION || p === RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN)
return;
if (p === 'then')
return;
@@ -140,14 +105,14 @@ class RpcProxy implements PrimitiveProxyHandler<any> {
return;
if (this.proxyProps?.[p] !== undefined)
return this.proxyProps?.[p];
const handled = handleFunctionInvocations(this, target, p, receiver);
const handled = RpcPeer.handleFunctionInvocations(this, target, p, receiver);
if (handled)
return handled;
return new Proxy(() => p, this);
}
set(target: any, p: string | symbol, value: any, receiver: any): boolean {
if (p === finalizerIdSymbol)
if (p === RpcPeer.finalizerIdSymbol)
this.entry.finalizerId = value;
return true;
}
@@ -248,7 +213,42 @@ export class RpcPeer {
finalizers = new FinalizationRegistry(entry => this.finalize(entry as LocalProxiedEntry));
nameDeserializerMap = new Map<string, RpcSerializer>();
constructorSerializerMap = new Map<string, string>();
transportSafeArgumentTypes = getDefaultTransportSafeArgumentTypes();
transportSafeArgumentTypes = RpcPeer.getDefaultTransportSafeArgumentTypes();
static readonly finalizerIdSymbol = Symbol('rpcFinalizerId');
static getDefaultTransportSafeArgumentTypes() {
const jsonSerializable = new Set<string>();
jsonSerializable.add(Number.name);
jsonSerializable.add(String.name);
jsonSerializable.add(Object.name);
jsonSerializable.add(Boolean.name);
jsonSerializable.add(Array.name);
return jsonSerializable;
}
static handleFunctionInvocations(thiz: PrimitiveProxyHandler<any>, target: any, p: PropertyKey, receiver: any): any {
if (p === 'apply') {
return (thisArg: any, args: any[]) => {
return thiz.apply!(target, thiz, args);
}
}
else if (p === 'call') {
return (thisArg: any, ...args: any[]) => {
return thiz.apply!(target, thiz, args);
}
}
else if (p === 'toString' || p === Symbol.toPrimitive) {
return (thisArg: any, ...args: any[]) => {
return thiz.toPrimitive();
}
}
}
static readonly PROPERTY_PROXY_ONEWAY_METHODS = '__proxy_oneway_methods';
static readonly PROPERTY_JSON_DISABLE_SERIALIZATION = '__json_disable_serialization';
static readonly PROPERTY_PROXY_PROPERTIES = '__proxy_props';
static readonly PROPERTY_JSON_COPY_SERIALIZE_CHILDREN = '__json_copy_serialize_children';
constructor(public selfName: string, public peerName: string, public send: (message: RpcMessage, reject?: (e: Error) => void) => void) {
}
@@ -335,7 +335,7 @@ export class RpcPeer {
if (!value)
return value;
const copySerializeChildren = value[PROPERTY_JSON_COPY_SERIALIZE_CHILDREN];
const copySerializeChildren = value[RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN];
if (copySerializeChildren) {
const ret: any = {};
for (const [key, val] of Object.entries(value)) {
@@ -349,7 +349,7 @@ export class RpcPeer {
let proxy = this.remoteWeakProxies[__remote_proxy_id]?.deref();
if (!proxy)
proxy = this.newProxy(__remote_proxy_id, __remote_constructor_name, __remote_proxy_props, __remote_proxy_oneway_methods);
proxy[finalizerIdSymbol] = __remote_proxy_finalizer_id;
proxy[RpcPeer.finalizerIdSymbol] = __remote_proxy_finalizer_id;
return proxy;
}
@@ -369,7 +369,7 @@ export class RpcPeer {
}
serialize(value: any): any {
if (value?.[PROPERTY_JSON_COPY_SERIALIZE_CHILDREN] === true) {
if (value?.[RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN] === true) {
const ret: any = {};
for (const [key, val] of Object.entries(value)) {
ret[key] = this.serialize(val);
@@ -377,7 +377,7 @@ export class RpcPeer {
return ret;
}
if (!value || (!value[PROPERTY_JSON_DISABLE_SERIALIZATION] && this.transportSafeArgumentTypes.has(value.constructor?.name))) {
if (!value || (!value[RpcPeer.PROPERTY_JSON_DISABLE_SERIALIZATION] && this.transportSafeArgumentTypes.has(value.constructor?.name))) {
return value;
}
@@ -391,8 +391,8 @@ export class RpcPeer {
__remote_proxy_id: proxiedEntry.id,
__remote_proxy_finalizer_id,
__remote_constructor_name,
__remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES],
__remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS],
__remote_proxy_props: value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES],
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
}
return ret;
}
@@ -416,8 +416,8 @@ export class RpcPeer {
__remote_proxy_id: undefined,
__remote_proxy_finalizer_id: undefined,
__remote_constructor_name,
__remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES],
__remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS],
__remote_proxy_props: value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES],
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
__serialized_value: serialized,
}
return ret;
@@ -435,8 +435,8 @@ export class RpcPeer {
__remote_proxy_id,
__remote_proxy_finalizer_id: __remote_proxy_id,
__remote_constructor_name,
__remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES],
__remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS],
__remote_proxy_props: value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES],
__remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS],
}
return ret;
@@ -553,4 +553,19 @@ export class RpcPeer {
return;
}
}
}
}
export function getEvalSource() {
return `
(() => {
${RpcProxy}
${RpcPeer}
return {
RpcPeer,
RpcProxy,
};
})();
`;
}

108
server/src/threading.ts Normal file
View File

@@ -0,0 +1,108 @@
import worker_threads from 'worker_threads';
import { getEvalSource, RpcPeer } from './rpc';
import v8 from 'v8';
export async function newThread<T>(thread: () => Promise<T>): Promise<T>;
export async function newThread<V, T>(params: V, thread: (params: V) => Promise<T>): Promise<T>;
export async function newThread<T>(...args: any[]): Promise<T> {
let thread: () => Promise<T> = args[1];
let params: { [key: string]: any } = {};
if (thread) {
params = args[0];
}
else {
thread = args[0];
}
const m = (customRequire: string, RpcPeer: any) => {
if (customRequire) {
const g = global as any;
g[customRequire] = g.require;
}
const v8 = global.require('v8');
const worker_threads = global.require('worker_threads');
const vm = global.require('vm');
const mainPeer = new RpcPeer('thread', 'main', (message: any, reject: any) => {
try {
worker_threads.parentPort.postMessage(v8.serialize(message));
}
catch (e) {
reject?.(e);
}
});
worker_threads.parentPort.on('message', (message: any) => mainPeer.handleMessage(v8.deserialize(message)));
mainPeer.params.eval = async (script: string, paramNames: string[], ...paramValues: any[]) => {
const f = vm.compileFunction(`return (${script})`, paramNames, {
filename: 'script.js',
});
const params: any = {};
for (let i = 0; i < paramNames.length; i++) {
params[paramNames[i]] = paramValues[i];
}
const c = await f(...paramValues);
return await c(params);
}
};
const rpcSource = getEvalSource();
let customRequire = params.customRequire || '';
const workerSource = `
const {RpcPeer} = ${rpcSource};
(${m})("${customRequire}", RpcPeer)`;
const worker = new worker_threads.Worker(workerSource, {
eval: true,
});
const threadPeer = new RpcPeer('main', 'thread', (message, reject) => {
try {
worker.postMessage(v8.serialize(message));
}
catch (e) {
reject?.(e);
}
});
worker.on('message', (message: any) => threadPeer.handleMessage(v8.deserialize(message)));
const e = await threadPeer.getParam('eval');
const paramNames = Object.keys(params);
const paramValues = Object.values(params);
try {
return await e(thread.toString(), paramNames, ...paramValues);
}
finally {
worker.terminate();
}
}
async function test() {
const foo = 5;
const bar = 6;
console.log(await newThread({
foo, bar,
}, async () => {
return foo + bar;
}));
console.log(await newThread({
foo, bar,
}, async ({foo,bar}) => {
return foo + bar;
}));
const sayHelloInMainThread = () => console.log('hello! main thread:', worker_threads.isMainThread);
await newThread({
sayHelloInMainThread,
}, async () => {
sayHelloInMainThread();
})
}
// if (true)
// test();