diff --git a/common/fs/sdk/index.d.ts b/common/fs/sdk/index.d.ts new file mode 120000 index 000000000..350fe100d --- /dev/null +++ b/common/fs/sdk/index.d.ts @@ -0,0 +1 @@ +../../../sdk/index.d.ts \ No newline at end of file diff --git a/common/fs/sdk/types.d.ts b/common/fs/sdk/types.d.ts new file mode 120000 index 000000000..ede33df0a --- /dev/null +++ b/common/fs/sdk/types.d.ts @@ -0,0 +1 @@ +../../../sdk/types/index.d.ts \ No newline at end of file diff --git a/common/src/read-stream.ts b/common/src/read-stream.ts index 3af676779..c96e97b55 100644 --- a/common/src/read-stream.ts +++ b/common/src/read-stream.ts @@ -1,4 +1,5 @@ -import { Readable } from "stream"; +import { Readable } from 'stream'; +import { once } from 'events'; export async function readLength(readable: Readable, length: number): Promise { if (!length) { @@ -55,3 +56,14 @@ export async function readUntil(readable: Readable, charCode: number) { export async function readLine(readable: Readable) { return readUntil(readable, CHARCODE_NEWLINE); } + +export async function readString(readable: Readable | Promise) { + let data = ''; + readable = await readable; + readable.on('data', buffer => { + data += buffer.toString(); + }); + readable.resume(); + await once(readable, 'end') + return data; +} diff --git a/common/src/rtsp-server.ts b/common/src/rtsp-server.ts index 2671b0248..b2c86fc71 100644 --- a/common/src/rtsp-server.ts +++ b/common/src/rtsp-server.ts @@ -101,6 +101,10 @@ export class RtspServer { return this.handleSetup(); } + async handleTeardown() { + return this.handleSetup(); + } + async *handleRecord(): AsyncGenerator<{ type: 'audio' | 'video', rtcp: boolean, @@ -243,7 +247,7 @@ export class RtspServer { } await this[method](url, requestHeaders); - return method !== 'play' && method !== 'record'; + return method !== 'play' && method !== 'record' && method !== 'teardown'; } respond(code: number, message: string, requestHeaders: Headers, headers: Headers, buffer?: Buffer) { diff --git a/common/src/scrypted-eval.ts b/common/src/scrypted-eval.ts index 2848db97d..1f8fb721d 100644 --- a/common/src/scrypted-eval.ts +++ b/common/src/scrypted-eval.ts @@ -1,6 +1,8 @@ import type { TranspileOptions } from "typescript"; import sdk, { ScryptedDeviceBase, ScryptedInterface, ScryptedDeviceType } from "@scrypted/sdk"; import vm from "vm"; +import fs from 'fs'; +import { newThread } from '../../server/src/threading'; const { systemManager, deviceManager, mediaManager, endpointManager } = sdk; @@ -20,16 +22,43 @@ function tsCompile(source: string, options: TranspileOptions = null): string { return ts.transpileModule(source, options).outputText; } -const scryptedTypesDefs = require('!!raw-loader!@scrypted/sdk/types/index.d.ts').default; -const scryptedIndexDefs = require('!!raw-loader!@scrypted/sdk/index.d.ts').default; +async function tsCompileThread(source: string, options: TranspileOptions = null): Promise { + return newThread({ + source, options, + customRequire: '__webpack_require__', + }, ({ source, options }) => { + const ts = global.require("typescript"); + const { ScriptTarget } = ts; + + // Default options -- you could also perform a merge, or use the project tsconfig.json + if (null === options) { + options = { + compilerOptions: { + target: ScriptTarget.ESNext, + module: ts.ModuleKind.CommonJS + } + }; + } + return ts.transpileModule(source, options).outputText; + }); +} + +function getTypeDefs() { + const scryptedTypesDefs = fs.readFileSync('sdk/types.d.ts').toString(); + const scryptedIndexDefs = fs.readFileSync('sdk/index.d.ts').toString(); + return { + scryptedIndexDefs, + scryptedTypesDefs, + }; +} export async function scryptedEval(device: ScryptedDeviceBase, script: string, extraLibs: { [lib: string]: string }, params: { [name: string]: any }) { try { const libs = Object.assign({ - types: scryptedTypesDefs, + types: getTypeDefs().scryptedTypesDefs, }, extraLibs); const allScripts = Object.values(libs).join('\n').toString() + script; - const compiled = tsCompile(allScripts); + const compiled = await tsCompileThread(allScripts); const allParams = Object.assign({}, params, { systemManager, @@ -74,10 +103,7 @@ export async function scryptedEval(device: ScryptedDeviceBase, script: string, e } export function createMonacoEvalDefaults(extraLibs: { [lib: string]: string }) { - const libs = Object.assign({ - types: scryptedTypesDefs, - sdk: scryptedIndexDefs, - }, extraLibs); + const libs = Object.assign(getTypeDefs(), extraLibs); function monacoEvalDefaultsFunction(monaco, libs) { monaco.languages.typescript.typescriptDefaults.setDiagnosticsOptions( diff --git a/server/package-lock.json b/server/package-lock.json index 49420a66a..db65b5488 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -24,6 +24,7 @@ "linkfs": "^2.1.0", "lodash": "^4.17.21", "memfs": "^3.2.2", + "mime": "^3.0.0", "mime-db": "^1.51.0", "mkdirp": "^1.0.4", "nan": "^2.15.0", @@ -1860,14 +1861,14 @@ } }, "node_modules/mime": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz", - "integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==", + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/mime/-/mime-3.0.0.tgz", + "integrity": "sha512-jSCU7/VB1loIWBZe14aEYHU/+1UMEHoaO7qxCOVJOw9GgH72VAWppxNcjU+x9a2k3GSIBXNKxXQFqRvvZ7vr3A==", "bin": { "mime": "cli.js" }, "engines": { - "node": ">=4" + "node": ">=10.0.0" } }, "node_modules/mime-db": { @@ -2675,6 +2676,17 @@ "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", "integrity": "sha1-VgiurfwAvmwpAd9fmGF4jeDVl8g=" }, + "node_modules/send/node_modules/mime": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz", + "integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==", + "bin": { + "mime": "cli.js" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/send/node_modules/ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", @@ -4692,9 +4704,9 @@ "integrity": "sha1-VSmk1nZUE07cxSZmVoNbD4Ua/O4=" }, "mime": { - "version": "1.6.0", - "resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz", - "integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==" + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/mime/-/mime-3.0.0.tgz", + "integrity": "sha512-jSCU7/VB1loIWBZe14aEYHU/+1UMEHoaO7qxCOVJOw9GgH72VAWppxNcjU+x9a2k3GSIBXNKxXQFqRvvZ7vr3A==" }, "mime-db": { "version": "1.51.0", @@ -5290,6 +5302,11 @@ } } }, + "mime": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/mime/-/mime-1.6.0.tgz", + "integrity": "sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==" + }, "ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", diff --git a/server/package.json b/server/package.json index 6c22391c8..0d7c690d1 100644 --- a/server/package.json +++ b/server/package.json @@ -18,6 +18,7 @@ "linkfs": "^2.1.0", "lodash": "^4.17.21", "memfs": "^3.2.2", + "mime": "^3.0.0", "mime-db": "^1.51.0", "mkdirp": "^1.0.4", "nan": "^2.15.0", diff --git a/server/src/http-interfaces.ts b/server/src/http-interfaces.ts index 48496eb47..dfcf7768f 100644 --- a/server/src/http-interfaces.ts +++ b/server/src/http-interfaces.ts @@ -1,13 +1,13 @@ import { HttpResponse, HttpResponseOptions } from "@scrypted/types"; import { Response } from "express"; import mime from "mime"; -import { PROPERTY_PROXY_ONEWAY_METHODS } from "./rpc"; +import { RpcPeer } from "./rpc"; import { join as pathJoin } from 'path'; import fs from 'fs'; export function createResponseInterface(res: Response, unzippedDir: string): HttpResponse { class HttpResponseImpl implements HttpResponse { - [PROPERTY_PROXY_ONEWAY_METHODS] = [ + [RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] = [ 'send', 'sendFile', ]; diff --git a/server/src/plugin/plugin-console.ts b/server/src/plugin/plugin-console.ts index 0fc6d0b69..d85e451a3 100644 --- a/server/src/plugin/plugin-console.ts +++ b/server/src/plugin/plugin-console.ts @@ -10,9 +10,7 @@ export interface ConsoleServer { pluginConsole: Console; readPort: number, writePort: number, - readServer: net.Server, - writeServer: net.Server, - sockets: Set; + destroy(): void; } export interface StdPassThroughs { @@ -129,11 +127,22 @@ export async function createConsoleServer(remoteStdout: Readable, remoteStderr: const writePort = await listenZero(writeServer); return { + destroy() { + for (const socket of sockets) { + socket.destroy(); + } + sockets.clear(); + outputs.clear(); + + try { + readServer.close(); + writeServer.close(); + } + catch (e) { + } + }, pluginConsole, readPort, writePort, - readServer, - writeServer, - sockets, }; } diff --git a/server/src/plugin/plugin-device.ts b/server/src/plugin/plugin-device.ts index 84aa4ad59..08eb25534 100644 --- a/server/src/plugin/plugin-device.ts +++ b/server/src/plugin/plugin-device.ts @@ -2,7 +2,7 @@ import { DeviceProvider, EventDetails, EventListenerOptions, EventListenerRegist import { ScryptedRuntime } from "../runtime"; import { PluginDevice } from "../db-types"; import { MixinProvider } from "@scrypted/types"; -import { handleFunctionInvocations, PrimitiveProxyHandler } from "../rpc"; +import { RpcPeer, PrimitiveProxyHandler } from "../rpc"; import { getState } from "../state"; import { getDisplayType } from "../infer-defaults"; import { allInterfaceProperties, isValidInterfaceMethod, methodInterfaces } from "./descriptor"; @@ -285,7 +285,7 @@ export class PluginDeviceProxyHandler implements PrimitiveProxyHandler, Scr get(target: any, p: PropertyKey, receiver: any): any { if (p === 'constructor') return; - const handled = handleFunctionInvocations(this, target, p, receiver); + const handled = RpcPeer.handleFunctionInvocations(this, target, p, receiver); if (handled) return handled; const pluginDevice = this.scrypted.findPluginDeviceById(this.id); diff --git a/server/src/plugin/plugin-host-api.ts b/server/src/plugin/plugin-host-api.ts index 8b9140031..e84041903 100644 --- a/server/src/plugin/plugin-host-api.ts +++ b/server/src/plugin/plugin-host-api.ts @@ -6,12 +6,12 @@ import { Logger } from '../logger'; import { getState } from '../state'; import { PluginHost } from './plugin-host'; import debounce from 'lodash/debounce'; -import { PROPERTY_PROXY_ONEWAY_METHODS } from '../rpc'; +import { RpcPeer } from '../rpc'; export class PluginHostAPI extends PluginAPIManagedListeners implements PluginAPI { pluginId: string; - [PROPERTY_PROXY_ONEWAY_METHODS] = [ + [RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] = [ 'onMixinEvent', 'onDeviceEvent', 'setStorage', diff --git a/server/src/plugin/plugin-host.ts b/server/src/plugin/plugin-host.ts index af97366d2..c00ad80c4 100644 --- a/server/src/plugin/plugin-host.ts +++ b/server/src/plugin/plugin-host.ts @@ -50,6 +50,7 @@ export class PluginHost { kill() { this.killed = true; this.api.removeListeners(); + this.peer.kill('plugin killed'); this.worker.kill(); this.io.close(); for (const s of Object.values(this.ws)) { @@ -60,14 +61,7 @@ export class PluginHost { const deviceIds = new Set(Object.values(this.scrypted.pluginDevices).filter(d => d.pluginId === this.pluginId).map(d => d._id)); this.scrypted.invalidateMixins(deviceIds); - this.consoleServer?.then(server => { - server.readServer.close(); - server.writeServer.close(); - for (const s of server.sockets) { - s.destroy(); - } - }); - setTimeout(() => this.peer.kill('plugin killed'), 500); + this.consoleServer.then(server => server.destroy()); } toString() { @@ -272,21 +266,26 @@ export class PluginHost { pluginConsole.log('starting plugin', this.pluginId, this.packageJson.version); }); - this.worker.on('close', () => { + const disconnect = () => { connected = false; + this.peer.kill('plugin disconnected'); + }; + + this.worker.on('close', () => { logger.log('e', `${this.pluginName} close`); + disconnect(); }); this.worker.on('disconnect', () => { - connected = false; logger.log('e', `${this.pluginName} disconnected`); + disconnect(); }); this.worker.on('exit', async (code, signal) => { - connected = false; logger.log('e', `${this.pluginName} exited ${code} ${signal}`); + disconnect(); }); this.worker.on('error', e => { - connected = false; logger.log('e', `${this.pluginName} error ${e}`); + disconnect(); }); this.peer.onOob = (oob: any) => { diff --git a/server/src/plugin/plugin-remote.ts b/server/src/plugin/plugin-remote.ts index a0d9c6872..6c2b977d5 100644 --- a/server/src/plugin/plugin-remote.ts +++ b/server/src/plugin/plugin-remote.ts @@ -4,7 +4,7 @@ import path from 'path'; import { ScryptedNativeId, DeviceManager, Logger, Device, DeviceManifest, DeviceState, EndpointManager, SystemDeviceState, ScryptedStatic, SystemManager, MediaManager, ScryptedMimeTypes, ScryptedInterface, ScryptedInterfaceProperty, HttpRequest } from '@scrypted/types' import { PluginAPI, PluginLogger, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api'; import { SystemManagerImpl } from './system'; -import { RpcPeer, RPCResultError, PROPERTY_PROXY_ONEWAY_METHODS, PROPERTY_JSON_DISABLE_SERIALIZATION } from '../rpc'; +import { RpcPeer, RPCResultError } from '../rpc'; import { BufferSerializer } from './buffer-serializer'; import { createWebSocketClass, WebSocketConnectCallbacks, WebSocketMethods } from './plugin-remote-websocket'; import fs from 'fs'; @@ -361,9 +361,9 @@ export function attachPluginRemote(peer: RpcPeer, options?: PluginRemoteAttachOp const localStorage = new StorageImpl(deviceManager, undefined); - const remote: PluginRemote & { [PROPERTY_JSON_DISABLE_SERIALIZATION]: boolean, [PROPERTY_PROXY_ONEWAY_METHODS]: string[] } = { - [PROPERTY_JSON_DISABLE_SERIALIZATION]: true, - [PROPERTY_PROXY_ONEWAY_METHODS]: [ + const remote: PluginRemote & { [RpcPeer.PROPERTY_JSON_DISABLE_SERIALIZATION]: boolean, [RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS]: string[] } = { + [RpcPeer.PROPERTY_JSON_DISABLE_SERIALIZATION]: true, + [RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS]: [ 'notify', 'updateDeviceState', 'setSystemState', diff --git a/server/src/plugin/runtime/node-thread-worker.ts b/server/src/plugin/runtime/node-thread-worker.ts index 72b629ef6..faab7e1ba 100644 --- a/server/src/plugin/runtime/node-thread-worker.ts +++ b/server/src/plugin/runtime/node-thread-worker.ts @@ -1,4 +1,3 @@ -import { Readable } from "stream"; import { EventEmitter } from "ws"; import { RpcMessage, RpcPeer } from "../../rpc"; import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; diff --git a/server/src/plugin/runtime/runtime-worker.ts b/server/src/plugin/runtime/runtime-worker.ts index fdbc5c389..183b785b1 100644 --- a/server/src/plugin/runtime/runtime-worker.ts +++ b/server/src/plugin/runtime/runtime-worker.ts @@ -15,7 +15,6 @@ export interface RuntimeWorker { kill(): void; - on(event: 'error', listener: (err: Error) => void): this; on(event: 'error', listener: (err: Error) => void): this; on(event: 'close', listener: (code: number | null, signal: NodeJS.Signals | null) => void): this; on(event: 'disconnect', listener: () => void): this; diff --git a/server/src/plugin/system.ts b/server/src/plugin/system.ts index 28bb1b759..70b5b6930 100644 --- a/server/src/plugin/system.ts +++ b/server/src/plugin/system.ts @@ -1,6 +1,6 @@ import { EventListenerOptions, EventDetails, EventListenerRegister, ScryptedDevice, ScryptedInterface, ScryptedInterfaceDescriptors, SystemDeviceState, SystemManager, ScryptedInterfaceProperty, ScryptedDeviceType, Logger } from "@scrypted/types"; import { PluginAPI } from "./plugin-api"; -import { handleFunctionInvocations, PrimitiveProxyHandler, PROPERTY_PROXY_ONEWAY_METHODS } from '../rpc'; +import { PrimitiveProxyHandler, RpcPeer } from '../rpc'; import { EventRegistry } from "../event-registry"; import { allInterfaceProperties, isValidInterfaceMethod } from "./descriptor"; @@ -24,7 +24,7 @@ class DeviceProxyHandler implements PrimitiveProxyHandler, ScryptedDevice { if (p === 'id') return this.id; - const handled = handleFunctionInvocations(this, target, p, receiver); + const handled = RpcPeer.handleFunctionInvocations(this, target, p, receiver); if (handled) return handled; @@ -96,10 +96,10 @@ class EventListenerRegisterImpl implements EventListenerRegister { function makeOneWayCallback(input: T): T { const f: any = input; - const oneways: string[] = f[PROPERTY_PROXY_ONEWAY_METHODS] || []; + const oneways: string[] = f[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] || []; if (!oneways.includes(null)) oneways.push(null); - f[PROPERTY_PROXY_ONEWAY_METHODS] = oneways; + f[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] = oneways; return input; } diff --git a/server/src/rpc.ts b/server/src/rpc.ts index 8c380d4d0..b488b2121 100644 --- a/server/src/rpc.ts +++ b/server/src/rpc.ts @@ -1,17 +1,5 @@ import vm from 'vm'; -const finalizerIdSymbol = Symbol('rpcFinalizerId'); - -function getDefaultTransportSafeArgumentTypes() { - const jsonSerializable = new Set(); - jsonSerializable.add(Number.name); - jsonSerializable.add(String.name); - jsonSerializable.add(Object.name); - jsonSerializable.add(Boolean.name); - jsonSerializable.add(Array.name); - return jsonSerializable; -} - export function startPeriodicGarbageCollection() { if (!global.gc) { console.warn('rpc peer garbage collection not available: global.gc is not exposed.'); @@ -81,29 +69,6 @@ interface Deferred { reject: any; } -export function handleFunctionInvocations(thiz: PrimitiveProxyHandler, target: any, p: PropertyKey, receiver: any): any { - if (p === 'apply') { - return (thisArg: any, args: any[]) => { - return thiz.apply!(target, thiz, args); - } - } - else if (p === 'call') { - return (thisArg: any, ...args: any[]) => { - return thiz.apply!(target, thiz, args); - } - } - else if (p === 'toString' || p === Symbol.toPrimitive) { - return (thisArg: any, ...args: any[]) => { - return thiz.toPrimitive(); - } - } -} - -export const PROPERTY_PROXY_ONEWAY_METHODS = '__proxy_oneway_methods'; -export const PROPERTY_JSON_DISABLE_SERIALIZATION = '__json_disable_serialization'; -export const PROPERTY_PROXY_PROPERTIES = '__proxy_props'; -export const PROPERTY_JSON_COPY_SERIALIZE_CHILDREN = '__json_copy_serialize_children'; - export interface PrimitiveProxyHandler extends ProxyHandler { toPrimitive(): any; } @@ -128,11 +93,11 @@ class RpcProxy implements PrimitiveProxyHandler { return this.constructorName; if (p === '__proxy_peer') return this.peer; - if (p === PROPERTY_PROXY_PROPERTIES) + if (p === RpcPeer.PROPERTY_PROXY_PROPERTIES) return this.proxyProps; - if (p === PROPERTY_PROXY_ONEWAY_METHODS) + if (p === RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS) return this.proxyOneWayMethods; - if (p === PROPERTY_JSON_DISABLE_SERIALIZATION || p === PROPERTY_JSON_COPY_SERIALIZE_CHILDREN) + if (p === RpcPeer.PROPERTY_JSON_DISABLE_SERIALIZATION || p === RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN) return; if (p === 'then') return; @@ -140,14 +105,14 @@ class RpcProxy implements PrimitiveProxyHandler { return; if (this.proxyProps?.[p] !== undefined) return this.proxyProps?.[p]; - const handled = handleFunctionInvocations(this, target, p, receiver); + const handled = RpcPeer.handleFunctionInvocations(this, target, p, receiver); if (handled) return handled; return new Proxy(() => p, this); } set(target: any, p: string | symbol, value: any, receiver: any): boolean { - if (p === finalizerIdSymbol) + if (p === RpcPeer.finalizerIdSymbol) this.entry.finalizerId = value; return true; } @@ -248,7 +213,42 @@ export class RpcPeer { finalizers = new FinalizationRegistry(entry => this.finalize(entry as LocalProxiedEntry)); nameDeserializerMap = new Map(); constructorSerializerMap = new Map(); - transportSafeArgumentTypes = getDefaultTransportSafeArgumentTypes(); + transportSafeArgumentTypes = RpcPeer.getDefaultTransportSafeArgumentTypes(); + + static readonly finalizerIdSymbol = Symbol('rpcFinalizerId'); + + static getDefaultTransportSafeArgumentTypes() { + const jsonSerializable = new Set(); + jsonSerializable.add(Number.name); + jsonSerializable.add(String.name); + jsonSerializable.add(Object.name); + jsonSerializable.add(Boolean.name); + jsonSerializable.add(Array.name); + return jsonSerializable; + } + + static handleFunctionInvocations(thiz: PrimitiveProxyHandler, target: any, p: PropertyKey, receiver: any): any { + if (p === 'apply') { + return (thisArg: any, args: any[]) => { + return thiz.apply!(target, thiz, args); + } + } + else if (p === 'call') { + return (thisArg: any, ...args: any[]) => { + return thiz.apply!(target, thiz, args); + } + } + else if (p === 'toString' || p === Symbol.toPrimitive) { + return (thisArg: any, ...args: any[]) => { + return thiz.toPrimitive(); + } + } + } + + static readonly PROPERTY_PROXY_ONEWAY_METHODS = '__proxy_oneway_methods'; + static readonly PROPERTY_JSON_DISABLE_SERIALIZATION = '__json_disable_serialization'; + static readonly PROPERTY_PROXY_PROPERTIES = '__proxy_props'; + static readonly PROPERTY_JSON_COPY_SERIALIZE_CHILDREN = '__json_copy_serialize_children'; constructor(public selfName: string, public peerName: string, public send: (message: RpcMessage, reject?: (e: Error) => void) => void) { } @@ -335,7 +335,7 @@ export class RpcPeer { if (!value) return value; - const copySerializeChildren = value[PROPERTY_JSON_COPY_SERIALIZE_CHILDREN]; + const copySerializeChildren = value[RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN]; if (copySerializeChildren) { const ret: any = {}; for (const [key, val] of Object.entries(value)) { @@ -349,7 +349,7 @@ export class RpcPeer { let proxy = this.remoteWeakProxies[__remote_proxy_id]?.deref(); if (!proxy) proxy = this.newProxy(__remote_proxy_id, __remote_constructor_name, __remote_proxy_props, __remote_proxy_oneway_methods); - proxy[finalizerIdSymbol] = __remote_proxy_finalizer_id; + proxy[RpcPeer.finalizerIdSymbol] = __remote_proxy_finalizer_id; return proxy; } @@ -369,7 +369,7 @@ export class RpcPeer { } serialize(value: any): any { - if (value?.[PROPERTY_JSON_COPY_SERIALIZE_CHILDREN] === true) { + if (value?.[RpcPeer.PROPERTY_JSON_COPY_SERIALIZE_CHILDREN] === true) { const ret: any = {}; for (const [key, val] of Object.entries(value)) { ret[key] = this.serialize(val); @@ -377,7 +377,7 @@ export class RpcPeer { return ret; } - if (!value || (!value[PROPERTY_JSON_DISABLE_SERIALIZATION] && this.transportSafeArgumentTypes.has(value.constructor?.name))) { + if (!value || (!value[RpcPeer.PROPERTY_JSON_DISABLE_SERIALIZATION] && this.transportSafeArgumentTypes.has(value.constructor?.name))) { return value; } @@ -391,8 +391,8 @@ export class RpcPeer { __remote_proxy_id: proxiedEntry.id, __remote_proxy_finalizer_id, __remote_constructor_name, - __remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES], - __remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS], + __remote_proxy_props: value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES], + __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], } return ret; } @@ -416,8 +416,8 @@ export class RpcPeer { __remote_proxy_id: undefined, __remote_proxy_finalizer_id: undefined, __remote_constructor_name, - __remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES], - __remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS], + __remote_proxy_props: value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES], + __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], __serialized_value: serialized, } return ret; @@ -435,8 +435,8 @@ export class RpcPeer { __remote_proxy_id, __remote_proxy_finalizer_id: __remote_proxy_id, __remote_constructor_name, - __remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES], - __remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS], + __remote_proxy_props: value?.[RpcPeer.PROPERTY_PROXY_PROPERTIES], + __remote_proxy_oneway_methods: value?.[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS], } return ret; @@ -553,4 +553,19 @@ export class RpcPeer { return; } } -} \ No newline at end of file +} + +export function getEvalSource() { + return ` + (() => { + ${RpcProxy} + + ${RpcPeer} + + return { + RpcPeer, + RpcProxy, + }; + })(); + `; +} diff --git a/server/src/threading.ts b/server/src/threading.ts new file mode 100644 index 000000000..9e9c0ffd8 --- /dev/null +++ b/server/src/threading.ts @@ -0,0 +1,108 @@ +import worker_threads from 'worker_threads'; +import { getEvalSource, RpcPeer } from './rpc'; +import v8 from 'v8'; + +export async function newThread(thread: () => Promise): Promise; +export async function newThread(params: V, thread: (params: V) => Promise): Promise; + +export async function newThread(...args: any[]): Promise { + let thread: () => Promise = args[1]; + let params: { [key: string]: any } = {}; + if (thread) { + params = args[0]; + } + else { + thread = args[0]; + } + + const m = (customRequire: string, RpcPeer: any) => { + if (customRequire) { + const g = global as any; + g[customRequire] = g.require; + } + const v8 = global.require('v8'); + const worker_threads = global.require('worker_threads'); + const vm = global.require('vm'); + const mainPeer = new RpcPeer('thread', 'main', (message: any, reject: any) => { + try { + worker_threads.parentPort.postMessage(v8.serialize(message)); + } + catch (e) { + reject?.(e); + } + }); + worker_threads.parentPort.on('message', (message: any) => mainPeer.handleMessage(v8.deserialize(message))); + + mainPeer.params.eval = async (script: string, paramNames: string[], ...paramValues: any[]) => { + const f = vm.compileFunction(`return (${script})`, paramNames, { + filename: 'script.js', + }); + const params: any = {}; + for (let i = 0; i < paramNames.length; i++) { + params[paramNames[i]] = paramValues[i]; + } + const c = await f(...paramValues); + return await c(params); + } + }; + const rpcSource = getEvalSource(); + + let customRequire = params.customRequire || ''; + + const workerSource = ` + const {RpcPeer} = ${rpcSource}; + + (${m})("${customRequire}", RpcPeer)`; + + const worker = new worker_threads.Worker(workerSource, { + eval: true, + }); + + const threadPeer = new RpcPeer('main', 'thread', (message, reject) => { + try { + worker.postMessage(v8.serialize(message)); + } + catch (e) { + reject?.(e); + } + }); + worker.on('message', (message: any) => threadPeer.handleMessage(v8.deserialize(message))); + + const e = await threadPeer.getParam('eval'); + const paramNames = Object.keys(params); + const paramValues = Object.values(params); + try { + return await e(thread.toString(), paramNames, ...paramValues); + } + finally { + worker.terminate(); + } +} + +async function test() { + const foo = 5; + const bar = 6; + + console.log(await newThread({ + foo, bar, + }, async () => { + return foo + bar; + })); + + + console.log(await newThread({ + foo, bar, + }, async ({foo,bar}) => { + return foo + bar; + })); + + const sayHelloInMainThread = () => console.log('hello! main thread:', worker_threads.isMainThread); + await newThread({ + sayHelloInMainThread, + }, async () => { + sayHelloInMainThread(); + }) +} + +// if (true) +// test();