From a26566202e7eeb14e66e954568090610dd6e1124 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Fri, 11 Feb 2022 13:28:12 -0800 Subject: [PATCH] rpc: move out of server --- server/package-lock.json | 11 + server/package.json | 1 + server/src/http-interfaces.ts | 2 +- server/src/plugin/buffer-serializer.ts | 2 +- server/src/plugin/plugin-device.ts | 2 +- server/src/plugin/plugin-host-api.ts | 2 +- server/src/plugin/plugin-remote-worker.ts | 2 +- server/src/plugin/plugin-remote.ts | 2 +- .../plugin/runtime/child-process-worker.ts | 2 +- server/src/plugin/runtime/node-fork-worker.ts | 2 +- .../src/plugin/runtime/node-thread-worker.ts | 2 +- server/src/plugin/runtime/python-worker.ts | 2 +- server/src/plugin/runtime/runtime-worker.ts | 3 +- server/src/plugin/system.ts | 2 +- server/src/rpc.ts | 545 ------------------ server/src/scrypted-plugin-main.ts | 2 +- 16 files changed, 25 insertions(+), 559 deletions(-) delete mode 100644 server/src/rpc.ts diff --git a/server/package-lock.json b/server/package-lock.json index fbba3ead9..ef3fa1cb1 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -11,6 +11,7 @@ "dependencies": { "@mapbox/node-pre-gyp": "^1.0.8", "@scrypted/ffmpeg": "^1.0.10", + "@scrypted/rpc": "^1.0.3", "@scrypted/types": "^0.0.6", "adm-zip": "^0.5.3", "axios": "^0.21.1", @@ -154,6 +155,11 @@ "follow-redirects": "^1.14.4" } }, + "node_modules/@scrypted/rpc": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@scrypted/rpc/-/rpc-1.0.3.tgz", + "integrity": "sha512-luEigc8gIMoKv26t2123KKUno3W7o4ze6SMv7ZPnRnlpYrgo9CmjDhezptTfQuHm6c/0eiIRPh6qGSBWOjelGw==" + }, "node_modules/@scrypted/types": { "version": "0.0.6", "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.0.6.tgz", @@ -3326,6 +3332,11 @@ } } }, + "@scrypted/rpc": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/@scrypted/rpc/-/rpc-1.0.3.tgz", + "integrity": "sha512-luEigc8gIMoKv26t2123KKUno3W7o4ze6SMv7ZPnRnlpYrgo9CmjDhezptTfQuHm6c/0eiIRPh6qGSBWOjelGw==" + }, "@scrypted/types": { "version": "0.0.6", "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.0.6.tgz", diff --git a/server/package.json b/server/package.json index 40a228e35..c2a1f7a69 100644 --- a/server/package.json +++ b/server/package.json @@ -5,6 +5,7 @@ "dependencies": { "@mapbox/node-pre-gyp": "^1.0.8", "@scrypted/ffmpeg": "^1.0.10", + "@scrypted/rpc": "^1.0.3", "@scrypted/types": "^0.0.6", "adm-zip": "^0.5.3", "axios": "^0.21.1", diff --git a/server/src/http-interfaces.ts b/server/src/http-interfaces.ts index 48496eb47..555e60be0 100644 --- a/server/src/http-interfaces.ts +++ b/server/src/http-interfaces.ts @@ -1,7 +1,7 @@ import { HttpResponse, HttpResponseOptions } from "@scrypted/types"; import { Response } from "express"; import mime from "mime"; -import { PROPERTY_PROXY_ONEWAY_METHODS } from "./rpc"; +import { PROPERTY_PROXY_ONEWAY_METHODS } from "@scrypted/rpc"; import { join as pathJoin } from 'path'; import fs from 'fs'; diff --git a/server/src/plugin/buffer-serializer.ts b/server/src/plugin/buffer-serializer.ts index 59213d93f..71982fd89 100644 --- a/server/src/plugin/buffer-serializer.ts +++ b/server/src/plugin/buffer-serializer.ts @@ -1,4 +1,4 @@ -import { RpcSerializer } from "../rpc"; +import { RpcSerializer } from "@scrypted/rpc"; export class BufferSerializer implements RpcSerializer { serialize(value: Buffer) { diff --git a/server/src/plugin/plugin-device.ts b/server/src/plugin/plugin-device.ts index 84aa4ad59..e8fda3174 100644 --- a/server/src/plugin/plugin-device.ts +++ b/server/src/plugin/plugin-device.ts @@ -2,7 +2,7 @@ import { DeviceProvider, EventDetails, EventListenerOptions, EventListenerRegist import { ScryptedRuntime } from "../runtime"; import { PluginDevice } from "../db-types"; import { MixinProvider } from "@scrypted/types"; -import { handleFunctionInvocations, PrimitiveProxyHandler } from "../rpc"; +import { handleFunctionInvocations, PrimitiveProxyHandler } from "@scrypted/rpc"; import { getState } from "../state"; import { getDisplayType } from "../infer-defaults"; import { allInterfaceProperties, isValidInterfaceMethod, methodInterfaces } from "./descriptor"; diff --git a/server/src/plugin/plugin-host-api.ts b/server/src/plugin/plugin-host-api.ts index 8b9140031..456b70595 100644 --- a/server/src/plugin/plugin-host-api.ts +++ b/server/src/plugin/plugin-host-api.ts @@ -6,7 +6,7 @@ import { Logger } from '../logger'; import { getState } from '../state'; import { PluginHost } from './plugin-host'; import debounce from 'lodash/debounce'; -import { PROPERTY_PROXY_ONEWAY_METHODS } from '../rpc'; +import { PROPERTY_PROXY_ONEWAY_METHODS } from '@scrypted/rpc'; export class PluginHostAPI extends PluginAPIManagedListeners implements PluginAPI { pluginId: string; diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index e190d91da..621c675d9 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -1,4 +1,4 @@ -import { RpcMessage, RpcPeer } from '../rpc'; +import { RpcMessage, RpcPeer } from '@scrypted/rpc'; import { SystemManager, DeviceManager, ScryptedNativeId } from '@scrypted/types' import { attachPluginRemote, PluginReader } from './plugin-remote'; import { PluginAPI } from './plugin-api'; diff --git a/server/src/plugin/plugin-remote.ts b/server/src/plugin/plugin-remote.ts index a0d9c6872..0a8d1da38 100644 --- a/server/src/plugin/plugin-remote.ts +++ b/server/src/plugin/plugin-remote.ts @@ -4,7 +4,7 @@ import path from 'path'; import { ScryptedNativeId, DeviceManager, Logger, Device, DeviceManifest, DeviceState, EndpointManager, SystemDeviceState, ScryptedStatic, SystemManager, MediaManager, ScryptedMimeTypes, ScryptedInterface, ScryptedInterfaceProperty, HttpRequest } from '@scrypted/types' import { PluginAPI, PluginLogger, PluginRemote, PluginRemoteLoadZipOptions } from './plugin-api'; import { SystemManagerImpl } from './system'; -import { RpcPeer, RPCResultError, PROPERTY_PROXY_ONEWAY_METHODS, PROPERTY_JSON_DISABLE_SERIALIZATION } from '../rpc'; +import { RpcPeer, RPCResultError, PROPERTY_PROXY_ONEWAY_METHODS, PROPERTY_JSON_DISABLE_SERIALIZATION } from '@scrypted/rpc'; import { BufferSerializer } from './buffer-serializer'; import { createWebSocketClass, WebSocketConnectCallbacks, WebSocketMethods } from './plugin-remote-websocket'; import fs from 'fs'; diff --git a/server/src/plugin/runtime/child-process-worker.ts b/server/src/plugin/runtime/child-process-worker.ts index 7f3e173ab..fb3ca09dc 100644 --- a/server/src/plugin/runtime/child-process-worker.ts +++ b/server/src/plugin/runtime/child-process-worker.ts @@ -1,7 +1,7 @@ import { EventEmitter } from "ws"; import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; import child_process from 'child_process'; -import { RpcMessage, RpcPeer } from "../../rpc"; +import { RpcMessage, RpcPeer } from "@scrypted/rpc"; export abstract class ChildProcessWorker extends EventEmitter implements RuntimeWorker { worker: child_process.ChildProcess; diff --git a/server/src/plugin/runtime/node-fork-worker.ts b/server/src/plugin/runtime/node-fork-worker.ts index 3203eb841..2e00d2466 100644 --- a/server/src/plugin/runtime/node-fork-worker.ts +++ b/server/src/plugin/runtime/node-fork-worker.ts @@ -1,7 +1,7 @@ import { RuntimeWorkerOptions as RuntimeWorkerOptions } from "./runtime-worker"; import child_process from 'child_process'; import path from 'path'; -import { RpcMessage, RpcPeer } from "../../rpc"; +import { RpcMessage, RpcPeer } from "@scrypted/rpc"; import { ChildProcessWorker } from "./child-process-worker"; import { getPluginNodePath } from "../plugin-npm-dependencies"; diff --git a/server/src/plugin/runtime/node-thread-worker.ts b/server/src/plugin/runtime/node-thread-worker.ts index 72b629ef6..315701cfd 100644 --- a/server/src/plugin/runtime/node-thread-worker.ts +++ b/server/src/plugin/runtime/node-thread-worker.ts @@ -1,6 +1,6 @@ import { Readable } from "stream"; import { EventEmitter } from "ws"; -import { RpcMessage, RpcPeer } from "../../rpc"; +import { RpcMessage, RpcPeer } from "@scrypted/rpc"; import { RuntimeWorker, RuntimeWorkerOptions } from "./runtime-worker"; import worker_threads from "worker_threads"; import path from 'path'; diff --git a/server/src/plugin/runtime/python-worker.ts b/server/src/plugin/runtime/python-worker.ts index fc95b7848..a3d9896b9 100644 --- a/server/src/plugin/runtime/python-worker.ts +++ b/server/src/plugin/runtime/python-worker.ts @@ -3,7 +3,7 @@ import child_process from 'child_process'; import path from 'path'; import { EventEmitter } from "ws"; import { Writable, Readable } from 'stream'; -import { RpcMessage, RpcPeer } from "../../rpc"; +import { RpcMessage, RpcPeer } from "@scrypted/rpc"; import readline from 'readline'; import { ChildProcessWorker } from "./child-process-worker"; diff --git a/server/src/plugin/runtime/runtime-worker.ts b/server/src/plugin/runtime/runtime-worker.ts index fdbc5c389..355152611 100644 --- a/server/src/plugin/runtime/runtime-worker.ts +++ b/server/src/plugin/runtime/runtime-worker.ts @@ -1,4 +1,4 @@ -import { RpcMessage, RpcPeer } from "../../rpc"; +import { RpcMessage, RpcPeer } from "@scrypted/rpc"; import { PluginDebug } from "../plugin-debug"; import {Readable} from "stream"; @@ -26,4 +26,3 @@ export interface RuntimeWorker { setupRpcPeer(peer: RpcPeer): void; } - diff --git a/server/src/plugin/system.ts b/server/src/plugin/system.ts index 28bb1b759..37bda446d 100644 --- a/server/src/plugin/system.ts +++ b/server/src/plugin/system.ts @@ -1,6 +1,6 @@ import { EventListenerOptions, EventDetails, EventListenerRegister, ScryptedDevice, ScryptedInterface, ScryptedInterfaceDescriptors, SystemDeviceState, SystemManager, ScryptedInterfaceProperty, ScryptedDeviceType, Logger } from "@scrypted/types"; import { PluginAPI } from "./plugin-api"; -import { handleFunctionInvocations, PrimitiveProxyHandler, PROPERTY_PROXY_ONEWAY_METHODS } from '../rpc'; +import { handleFunctionInvocations, PrimitiveProxyHandler, PROPERTY_PROXY_ONEWAY_METHODS } from '@scrypted/rpc'; import { EventRegistry } from "../event-registry"; import { allInterfaceProperties, isValidInterfaceMethod } from "./descriptor"; diff --git a/server/src/rpc.ts b/server/src/rpc.ts deleted file mode 100644 index dd43d33e7..000000000 --- a/server/src/rpc.ts +++ /dev/null @@ -1,545 +0,0 @@ -import vm from 'vm'; - -const finalizerIdSymbol = Symbol('rpcFinalizerId'); - -function getDefaultTransportSafeArgumentTypes() { - const jsonSerializable = new Set(); - jsonSerializable.add(Number.name); - jsonSerializable.add(String.name); - jsonSerializable.add(Object.name); - jsonSerializable.add(Boolean.name); - jsonSerializable.add(Array.name); - return jsonSerializable; -} - -export function startPeriodicGarbageCollection() { - if (!global.gc) { - console.warn('rpc peer garbage collection not available: global.gc is not exposed.'); - return; - } - return setInterval(() => { - global?.gc(); - }, 10000); -} - -export interface RpcMessage { - type: string; -} - -interface RpcParam extends RpcMessage { - id: string; - param: string; -} - -interface RpcApply extends RpcMessage { - id: string; - proxyId: string; - args: any[]; - method: string; - oneway?: boolean; -} - -interface RpcResult extends RpcMessage { - id: string; - stack?: string; - message?: string; - result?: any; -} - -interface RpcOob extends RpcMessage { - oob: any; -} - -interface RpcRemoteProxyValue { - __remote_proxy_id: string; - __remote_proxy_finalizer_id: string; - __remote_constructor_name: string; - __remote_proxy_props: any; - __remote_proxy_oneway_methods: string[]; - __serialized_value?: any; -} - -interface RpcLocalProxyValue { - __local_proxy_id: string; -} - -interface RpcFinalize extends RpcMessage { - __local_proxy_id: string; - __local_proxy_finalizer_id: string; -} - -interface Deferred { - resolve: any; - reject: any; -} - -export function handleFunctionInvocations(thiz: PrimitiveProxyHandler, target: any, p: PropertyKey, receiver: any): any { - if (p === 'apply') { - return (thisArg: any, args: any[]) => { - return thiz.apply(target, thiz, args); - } - } - else if (p === 'call') { - return (thisArg: any, ...args: any[]) => { - return thiz.apply(target, thiz, args); - } - } - else if (p === 'toString' || p === Symbol.toPrimitive) { - return (thisArg: any, ...args: any[]) => { - return thiz.toPrimitive(); - } - } -} - -export const PROPERTY_PROXY_ONEWAY_METHODS = '__proxy_oneway_methods'; -export const PROPERTY_JSON_DISABLE_SERIALIZATION = '__json_disable_serialization'; -export const PROPERTY_PROXY_PROPERTIES = '__proxy_props'; -export const PROPERTY_JSON_COPY_SERIALIZE_CHILDREN = '__json_copy_serialize_children'; - -export interface PrimitiveProxyHandler extends ProxyHandler { - toPrimitive(): any; -} - -class RpcProxy implements PrimitiveProxyHandler { - constructor(public peer: RpcPeer, - public entry: LocalProxiedEntry, - public constructorName: string, - public proxyProps: any, - public proxyOneWayMethods: string[]) { - } - - toPrimitive() { - const peer = this.peer; - return `RpcProxy-${peer.selfName}:${peer.peerName}: ${this.constructorName}`; - } - - get(target: any, p: PropertyKey, receiver: any): any { - if (p === '__proxy_id') - return this.entry.id; - if (p === '__proxy_constructor') - return this.constructorName; - if (p === '__proxy_peer') - return this.peer; - if (p === PROPERTY_PROXY_PROPERTIES) - return this.proxyProps; - if (p === PROPERTY_PROXY_ONEWAY_METHODS) - return this.proxyOneWayMethods; - if (p === PROPERTY_JSON_DISABLE_SERIALIZATION || p === PROPERTY_JSON_COPY_SERIALIZE_CHILDREN) - return; - if (p === 'then') - return; - if (p === 'constructor') - return; - if (this.proxyProps?.[p] !== undefined) - return this.proxyProps?.[p]; - const handled = handleFunctionInvocations(this, target, p, receiver); - if (handled) - return handled; - return new Proxy(() => p, this); - } - - set(target: any, p: string | symbol, value: any, receiver: any): boolean { - if (p === finalizerIdSymbol) - this.entry.finalizerId = value; - return true; - } - - apply(target: any, thisArg: any, argArray?: any): any { - if (Object.isFrozen(this.peer.pendingResults)) - return Promise.reject(new RPCResultError(this.peer, 'RpcPeer has been killed')); - - // rpc objects can be functions. if the function is a oneway method, - // it will have a null in the oneway method list. this is because - // undefined is not JSON serializable. - const method = target() || null; - const args: any[] = []; - for (const arg of (argArray || [])) { - args.push(this.peer.serialize(arg)); - } - - const rpcApply: RpcApply = { - type: "apply", - id: undefined, - proxyId: this.entry.id, - args, - method, - }; - - if (this.proxyOneWayMethods?.includes?.(method)) { - rpcApply.oneway = true; - this.peer.send(rpcApply); - return Promise.resolve(); - } - - return this.peer.createPendingResult((id, reject) => { - rpcApply.id = id; - this.peer.send(rpcApply, reject); - }) - } -} - -// todo: error constructor adds a "cause" variable in Chrome 93, Node v?? -export class RPCResultError extends Error { - constructor(peer: RpcPeer, message: string, public cause?: Error, options?: { name: string, stack: string}) { - super(`${peer.selfName}:${peer.peerName}: ${message}`); - - if (options?.name) { - this.name = options?.name; - } - if (options?.stack) { - this.stack = `${peer.peerName}:${peer.selfName}\n${cause?.stack || options.stack}`; - } - } -} - -function compileFunction(code: string, params?: ReadonlyArray, options?: vm.CompileFunctionOptions): any { - params = params || []; - const f = `(function(${params.join(',')}) {;${code};})`; - return eval(f); -} - -try { - const fr = FinalizationRegistry; -} -catch (e) { - (window as any).WeakRef = class WeakRef { - target: any; - constructor(target: any) { - this.target = target; - } - deref(): any { - return this.target; - } - }; - - (window as any).FinalizationRegistry = class FinalizationRegistry { - register() { - } - } -} - -export interface RpcSerializer { - serialize(value: any): any; - deserialize(serialized: any): any; -} - -interface LocalProxiedEntry { - id: string; - finalizerId: string; -} - -export class RpcPeer { - idCounter = 1; - onOob: (oob: any) => void; - params: { [name: string]: any } = {}; - pendingResults: { [id: string]: Deferred } = {}; - proxyCounter = 1; - localProxied = new Map(); - localProxyMap: { [id: string]: any } = {}; - remoteWeakProxies: { [id: string]: WeakRef } = {}; - finalizers = new FinalizationRegistry(entry => this.finalize(entry as LocalProxiedEntry)); - nameDeserializerMap = new Map(); - constructorSerializerMap = new Map(); - transportSafeArgumentTypes = getDefaultTransportSafeArgumentTypes(); - - constructor(public selfName: string, public peerName: string, public send: (message: RpcMessage, reject?: (e: Error) => void) => void) { - } - - createPendingResult(cb: (id: string, reject: (e: Error) => void) => void): Promise { - if (Object.isFrozen(this.pendingResults)) - return Promise.reject(new RPCResultError(this, 'RpcPeer has been killed')); - - const promise = new Promise((resolve, reject) => { - const id = (this.idCounter++).toString(); - this.pendingResults[id] = { resolve, reject }; - - cb(id, e => reject(new RPCResultError(this, e.message, e))); - }); - - // todo: make this an option so rpc doesn't nuke the process if uncaught? - promise.catch(() => { }); - - return promise; - } - - kill(message?: string) { - const error = new RPCResultError(this, message || 'peer was killed'); - for (const result of Object.values(this.pendingResults)) { - result.reject(error); - } - this.pendingResults = Object.freeze({}); - this.remoteWeakProxies = Object.freeze({}); - this.localProxyMap = Object.freeze({}); - this.localProxied.clear(); - } - - // need a name/constructor map due to babel name mangling? fix somehow? - addSerializer(ctr: any, name: string, serializer: RpcSerializer) { - this.nameDeserializerMap.set(name, serializer); - this.constructorSerializerMap.set(ctr, name); - } - - finalize(entry: LocalProxiedEntry) { - delete this.remoteWeakProxies[entry.id]; - const rpcFinalize: RpcFinalize = { - __local_proxy_id: entry.id, - __local_proxy_finalizer_id: entry.finalizerId, - type: 'finalize', - } - this.send(rpcFinalize); - } - - async getParam(param: string) { - return this.createPendingResult((id, reject) => { - const paramMessage: RpcParam = { - id, - type: 'param', - param, - }; - - this.send(paramMessage, reject); - }); - } - - sendOob(oob: any) { - this.send({ - type: 'oob', - oob, - } as RpcOob) - } - - evalLocal(script: string, filename?: string, coercedParams?: { [name: string]: any }): T { - const params = Object.assign({}, this.params, coercedParams); - const f = (vm.compileFunction || compileFunction)(script, Object.keys(params), { - filename, - }); - const value = f(...Object.values(params)); - return value; - } - - createErrorResult(result: RpcResult, e: any) { - result.stack = e.stack || 'no stack'; - result.result = (e as Error).name || 'no name'; - result.message = (e as Error).message || 'no message'; - } - - deserialize(value: any): any { - if (!value) - return value; - - const copySerializeChildren = value[PROPERTY_JSON_COPY_SERIALIZE_CHILDREN]; - if (copySerializeChildren) { - const ret: any = {}; - for (const [key, val] of Object.entries(value)) { - ret[key] = this.deserialize(val); - } - return ret; - } - - const { __remote_proxy_id, __remote_proxy_finalizer_id, __local_proxy_id, __remote_constructor_name, __serialized_value, __remote_proxy_props, __remote_proxy_oneway_methods } = value; - if (__remote_proxy_id) { - let proxy = this.remoteWeakProxies[__remote_proxy_id]?.deref(); - if (!proxy) - proxy = this.newProxy(__remote_proxy_id, __remote_constructor_name, __remote_proxy_props, __remote_proxy_oneway_methods); - proxy[finalizerIdSymbol] = __remote_proxy_finalizer_id; - return proxy; - } - - if (__local_proxy_id) { - const ret = this.localProxyMap[__local_proxy_id]; - if (!ret) - throw new RPCResultError(this, `invalid local proxy id ${__local_proxy_id}`); - return ret; - } - - if (this.nameDeserializerMap.has(__remote_constructor_name)) { - return this.nameDeserializerMap.get(__remote_constructor_name).deserialize(__serialized_value); - } - - return value; - } - - serialize(value: any): any { - if (value?.[PROPERTY_JSON_COPY_SERIALIZE_CHILDREN] === true) { - const ret: any = {}; - for (const [key, val] of Object.entries(value)) { - ret[key] = this.serialize(val); - } - return ret; - } - - if (!value || (!value[PROPERTY_JSON_DISABLE_SERIALIZATION] && this.transportSafeArgumentTypes.has(value.constructor?.name))) { - return value; - } - - let __remote_constructor_name = value.__proxy_constructor || value.constructor?.name?.toString(); - - let proxiedEntry = this.localProxied.get(value); - if (proxiedEntry) { - const __remote_proxy_finalizer_id = (this.proxyCounter++).toString(); - proxiedEntry.finalizerId = __remote_proxy_finalizer_id; - const ret: RpcRemoteProxyValue = { - __remote_proxy_id: proxiedEntry.id, - __remote_proxy_finalizer_id, - __remote_constructor_name, - __remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES], - __remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS], - } - return ret; - } - - const { __proxy_id, __proxy_peer } = value; - if (__proxy_id && __proxy_peer === this) { - const ret: RpcLocalProxyValue = { - __local_proxy_id: __proxy_id, - } - return ret; - } - - const serializerMapName = this.constructorSerializerMap.get(value.constructor); - if (serializerMapName) { - __remote_constructor_name = serializerMapName; - const serializer = this.nameDeserializerMap.get(serializerMapName); - const serialized = serializer.serialize(value); - const ret: RpcRemoteProxyValue = { - __remote_proxy_id: undefined, - __remote_proxy_finalizer_id: undefined, - __remote_constructor_name, - __remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES], - __remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS], - __serialized_value: serialized, - } - return ret; - } - - const __remote_proxy_id = (this.proxyCounter++).toString(); - proxiedEntry = { - id: __remote_proxy_id, - finalizerId: __remote_proxy_id, - }; - this.localProxied.set(value, proxiedEntry); - this.localProxyMap[__remote_proxy_id] = value; - - const ret: RpcRemoteProxyValue = { - __remote_proxy_id, - __remote_proxy_finalizer_id: __remote_proxy_id, - __remote_constructor_name, - __remote_proxy_props: value?.[PROPERTY_PROXY_PROPERTIES], - __remote_proxy_oneway_methods: value?.[PROPERTY_PROXY_ONEWAY_METHODS], - } - - return ret; - } - - newProxy(proxyId: string, proxyConstructorName: string, proxyProps: any, proxyOneWayMethods: string[]) { - const localProxiedEntry: LocalProxiedEntry = { - id: proxyId, - finalizerId: undefined, - } - const rpc = new RpcProxy(this, localProxiedEntry, proxyConstructorName, proxyProps, proxyOneWayMethods); - const target = proxyConstructorName === 'Function' || proxyConstructorName === 'AsyncFunction' ? function () { } : rpc; - const proxy = new Proxy(target, rpc); - const weakref = new WeakRef(proxy); - this.remoteWeakProxies[proxyId] = weakref; - this.finalizers.register(rpc, localProxiedEntry); - return proxy; - } - - async handleMessage(message: RpcMessage) { - try { - switch (message.type) { - case 'param': { - const rpcParam = message as RpcParam; - const result: RpcResult = { - type: 'result', - id: rpcParam.id, - result: this.serialize(this.params[rpcParam.param]) - }; - this.send(result); - break; - } - case 'apply': { - const rpcApply = message as RpcApply; - const result: RpcResult = { - type: 'result', - id: rpcApply.id, - }; - - try { - const target = this.localProxyMap[rpcApply.proxyId]; - if (!target) - throw new Error(`proxy id ${rpcApply.proxyId} not found`); - - const args = []; - for (const arg of (rpcApply.args || [])) { - args.push(this.deserialize(arg)); - } - - let value: any; - if (rpcApply.method) { - const method = target[rpcApply.method]; - if (!method) - throw new Error(`target ${target?.constructor?.name} does not have method ${rpcApply.method}`); - value = await target[rpcApply.method](...args); - } - else { - value = await target(...args); - } - - result.result = this.serialize(value); - } - catch (e) { - console.error('failure', rpcApply.method, e); - this.createErrorResult(result, e); - } - - if (!rpcApply.oneway) - this.send(result); - break; - } - case 'result': { - const rpcResult = message as RpcResult; - const deferred = this.pendingResults[rpcResult.id]; - delete this.pendingResults[rpcResult.id]; - if (!deferred) - throw new Error(`unknown result ${rpcResult.id}`); - if (rpcResult.message || rpcResult.stack) { - const e = new RPCResultError(this, rpcResult.message, undefined, { - name: rpcResult.result, - stack: rpcResult.stack, - }); - deferred.reject(e); - return; - } - deferred.resolve(this.deserialize(rpcResult.result)); - break; - } - case 'finalize': { - const rpcFinalize = message as RpcFinalize; - const local = this.localProxyMap[rpcFinalize.__local_proxy_id]; - if (local) { - const localProxiedEntry = this.localProxied.get(local); - // if a finalizer id is specified, it must match. - if (rpcFinalize.__local_proxy_finalizer_id && rpcFinalize.__local_proxy_finalizer_id !== localProxiedEntry?.finalizerId) { - break; - } - delete this.localProxyMap[rpcFinalize.__local_proxy_id]; - this.localProxied.delete(local); - } - break; - } - case 'oob': { - const rpcOob = message as RpcOob; - this.onOob?.(rpcOob.oob); - break; - } - default: - throw new Error(`unknown rpc message type ${message.type}`); - } - } - catch (e) { - console.error('unhandled rpc error', this.peerName, e); - return; - } - } -} \ No newline at end of file diff --git a/server/src/scrypted-plugin-main.ts b/server/src/scrypted-plugin-main.ts index 556ceb9a0..5a7be55be 100644 --- a/server/src/scrypted-plugin-main.ts +++ b/server/src/scrypted-plugin-main.ts @@ -1,5 +1,5 @@ import { startPluginRemote } from "./plugin/plugin-remote-worker"; -import { RpcMessage } from "./rpc"; +import { RpcMessage } from "@scrypted/rpc"; import worker_threads from "worker_threads"; import v8 from 'v8';