From 7d01311f7f279b4595131cabd33db5145307f507 Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Wed, 3 Nov 2021 23:57:21 -0700 Subject: [PATCH] server/rpc: simplify rpc code. initial python rpc. --- rpc/.gitignore | 2 + rpc/.vscode/launch.json | 38 +++ rpc/package-lock.json | 305 +++++++++++++++++++++++ rpc/package.json | 9 + rpc/rpc.py | 373 +++++++++++++++++++++++++++++ rpc/src/test.ts | 40 ++++ rpc/tsconfig.json | 16 ++ server/src/plugin/plugin-remote.ts | 12 +- server/src/rpc.ts | 73 ++---- 9 files changed, 810 insertions(+), 58 deletions(-) create mode 100644 rpc/.gitignore create mode 100644 rpc/.vscode/launch.json create mode 100644 rpc/package-lock.json create mode 100644 rpc/package.json create mode 100644 rpc/rpc.py create mode 100644 rpc/src/test.ts create mode 100644 rpc/tsconfig.json diff --git a/rpc/.gitignore b/rpc/.gitignore new file mode 100644 index 000000000..671215ddc --- /dev/null +++ b/rpc/.gitignore @@ -0,0 +1,2 @@ +node_modules +.venv diff --git a/rpc/.vscode/launch.json b/rpc/.vscode/launch.json new file mode 100644 index 000000000..41d62525d --- /dev/null +++ b/rpc/.vscode/launch.json @@ -0,0 +1,38 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python", + "type": "python", + "request": "launch", + "program": "${workspaceFolder}/rpc.py", + "console": "integratedTerminal" + }, + { + "type": "pwa-node", + "request": "launch", + "name": "Launch Program TS Node", + "skipFiles": [ + "/**" + ], + "program": "${workspaceFolder}/src/test.ts", + "runtimeArgs": [ + "--expose-gc", + "--nolazy", + "-r", + "ts-node/register" + ], + "sourceMaps": true, + "resolveSourceMapLocations": [ + "${workspaceFolder}/**", + "!**/node_modules/**" + ], + "outFiles": [ + "${workspaceFolder}/**/*.js" + ], + }, + ] +} \ No newline at end of file diff --git a/rpc/package-lock.json b/rpc/package-lock.json new file mode 100644 index 000000000..bf9382918 --- /dev/null +++ b/rpc/package-lock.json @@ -0,0 +1,305 @@ +{ + "name": "rpc", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "dependencies": { + "tslib": "^2.3.1" + }, + "devDependencies": { + "ts-node": "^10.4.0", + "typescript": "^4.4.4" + } + }, + "node_modules/@cspotcode/source-map-consumer": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz", + "integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==", + "dev": true, + "engines": { + "node": ">= 12" + } + }, + "node_modules/@cspotcode/source-map-support": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz", + "integrity": "sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==", + "dev": true, + "dependencies": { + "@cspotcode/source-map-consumer": "0.8.0" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/@tsconfig/node10": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz", + "integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==", + "dev": true + }, + "node_modules/@tsconfig/node12": { + "version": "1.0.9", + "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz", + "integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==", + "dev": true + }, + "node_modules/@tsconfig/node14": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz", + "integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==", + "dev": true + }, + "node_modules/@tsconfig/node16": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz", + "integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==", + "dev": true + }, + "node_modules/@types/node": { + "version": "16.11.6", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.6.tgz", + "integrity": "sha512-ua7PgUoeQFjmWPcoo9khiPum3Pd60k4/2ZGXt18sm2Slk0W0xZTqt5Y0Ny1NyBiN1EVQ/+FaF9NcY4Qe6rwk5w==", + "dev": true, + "peer": true + }, + "node_modules/acorn": { + "version": "8.5.0", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.5.0.tgz", + "integrity": "sha512-yXbYeFy+jUuYd3/CDcg2NkIYE991XYX/bje7LmjJigUciaeO1JR4XxXgCIV1/Zc/dRuFEyw1L0pbA+qynJkW5Q==", + "dev": true, + "bin": { + "acorn": "bin/acorn" + }, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/acorn-walk": { + "version": "8.2.0", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz", + "integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==", + "dev": true, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", + "dev": true + }, + "node_modules/create-require": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", + "dev": true + }, + "node_modules/diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "dev": true, + "engines": { + "node": ">=0.3.1" + } + }, + "node_modules/make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", + "dev": true + }, + "node_modules/ts-node": { + "version": "10.4.0", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.4.0.tgz", + "integrity": "sha512-g0FlPvvCXSIO1JDF6S232P5jPYqBkRL9qly81ZgAOSU7rwI0stphCgd2kLiCrU9DjQCrJMWEqcNSjQL02s6d8A==", + "dev": true, + "dependencies": { + "@cspotcode/source-map-support": "0.7.0", + "@tsconfig/node10": "^1.0.7", + "@tsconfig/node12": "^1.0.7", + "@tsconfig/node14": "^1.0.0", + "@tsconfig/node16": "^1.0.2", + "acorn": "^8.4.1", + "acorn-walk": "^8.1.1", + "arg": "^4.1.0", + "create-require": "^1.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "yn": "3.1.1" + }, + "bin": { + "ts-node": "dist/bin.js", + "ts-node-cwd": "dist/bin-cwd.js", + "ts-node-script": "dist/bin-script.js", + "ts-node-transpile-only": "dist/bin-transpile.js", + "ts-script": "dist/bin-script-deprecated.js" + }, + "peerDependencies": { + "@swc/core": ">=1.2.50", + "@swc/wasm": ">=1.2.50", + "@types/node": "*", + "typescript": ">=2.7" + }, + "peerDependenciesMeta": { + "@swc/core": { + "optional": true + }, + "@swc/wasm": { + "optional": true + } + } + }, + "node_modules/tslib": { + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.3.1.tgz", + "integrity": "sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw==" + }, + "node_modules/typescript": { + "version": "4.4.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.4.tgz", + "integrity": "sha512-DqGhF5IKoBl8WNf8C1gu8q0xZSInh9j1kJJMqT3a94w1JzVaBU4EXOSMrz9yDqMT0xt3selp83fuFMQ0uzv6qA==", + "dev": true, + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=4.2.0" + } + }, + "node_modules/yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", + "dev": true, + "engines": { + "node": ">=6" + } + } + }, + "dependencies": { + "@cspotcode/source-map-consumer": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz", + "integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==", + "dev": true + }, + "@cspotcode/source-map-support": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz", + "integrity": "sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==", + "dev": true, + "requires": { + "@cspotcode/source-map-consumer": "0.8.0" + } + }, + "@tsconfig/node10": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz", + "integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==", + "dev": true + }, + "@tsconfig/node12": { + "version": "1.0.9", + "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz", + "integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==", + "dev": true + }, + "@tsconfig/node14": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz", + "integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==", + "dev": true + }, + "@tsconfig/node16": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz", + "integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==", + "dev": true + }, + "@types/node": { + "version": "16.11.6", + "resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.6.tgz", + "integrity": "sha512-ua7PgUoeQFjmWPcoo9khiPum3Pd60k4/2ZGXt18sm2Slk0W0xZTqt5Y0Ny1NyBiN1EVQ/+FaF9NcY4Qe6rwk5w==", + "dev": true, + "peer": true + }, + "acorn": { + "version": "8.5.0", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.5.0.tgz", + "integrity": "sha512-yXbYeFy+jUuYd3/CDcg2NkIYE991XYX/bje7LmjJigUciaeO1JR4XxXgCIV1/Zc/dRuFEyw1L0pbA+qynJkW5Q==", + "dev": true + }, + "acorn-walk": { + "version": "8.2.0", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz", + "integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==", + "dev": true + }, + "arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", + "dev": true + }, + "create-require": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", + "dev": true + }, + "diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "dev": true + }, + "make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", + "dev": true + }, + "ts-node": { + "version": "10.4.0", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.4.0.tgz", + "integrity": "sha512-g0FlPvvCXSIO1JDF6S232P5jPYqBkRL9qly81ZgAOSU7rwI0stphCgd2kLiCrU9DjQCrJMWEqcNSjQL02s6d8A==", + "dev": true, + "requires": { + "@cspotcode/source-map-support": "0.7.0", + "@tsconfig/node10": "^1.0.7", + "@tsconfig/node12": "^1.0.7", + "@tsconfig/node14": "^1.0.0", + "@tsconfig/node16": "^1.0.2", + "acorn": "^8.4.1", + "acorn-walk": "^8.1.1", + "arg": "^4.1.0", + "create-require": "^1.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "yn": "3.1.1" + } + }, + "tslib": { + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.3.1.tgz", + "integrity": "sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw==" + }, + "typescript": { + "version": "4.4.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.4.tgz", + "integrity": "sha512-DqGhF5IKoBl8WNf8C1gu8q0xZSInh9j1kJJMqT3a94w1JzVaBU4EXOSMrz9yDqMT0xt3selp83fuFMQ0uzv6qA==", + "dev": true + }, + "yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", + "dev": true + } + } +} diff --git a/rpc/package.json b/rpc/package.json new file mode 100644 index 000000000..bfa664c61 --- /dev/null +++ b/rpc/package.json @@ -0,0 +1,9 @@ +{ + "devDependencies": { + "ts-node": "^10.4.0", + "typescript": "^4.4.4" + }, + "dependencies": { + "tslib": "^2.3.1" + } +} diff --git a/rpc/rpc.py b/rpc/rpc.py new file mode 100644 index 000000000..5b300d37f --- /dev/null +++ b/rpc/rpc.py @@ -0,0 +1,373 @@ +from asyncio.events import AbstractEventLoop +from asyncio.futures import Future +from typing import Callable +import aiofiles +import asyncio +import json +import traceback +import inspect +import json +from collections.abc import Mapping, Sequence +import weakref + +jsonSerializable = set() +jsonSerializable.add(float) +jsonSerializable.add(int) +jsonSerializable.add(str) +jsonSerializable.add(dict) +jsonSerializable.add(bool) +jsonSerializable.add(list) + + +async def maybe_await(value): + if (inspect.iscoroutinefunction(value)): + return await value + return value + + +class RpcResultException(Exception): + name = None + stack = None + + def __init__(self, caught, message): + self.caught = caught + self.message = message + + +class RpcSerializer: + def serialize(self, value): + pass + + def deserialize(self, value): + pass + + +class RpcProxyMethod: + def __init__(self, proxy, name): + self.__proxy = proxy + self.__proxy_method_name = name + + def __call__(self, *args, **kwargs): + return self.__proxy.__apply__(self.__proxy_method_name, args) + + +class RpcProxy: + def __init__(self, peer, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: list[str]): + self.__proxy_id = proxyId + self.__proxy_constructor = proxyConstructorName + self.__proxy_peer = peer + self.__proxy_props = proxyProps + self.__proxy_oneway_methods = proxyOneWayMethods + + def __getattr__(self, name): + if self.__proxy_props and hasattr(self.__proxy_props, name): + return self.__proxy_props[name] + return RpcProxyMethod(self, name) + + def __call__(self, *args, **kwargs): + print('call') + pass + + def __apply__(self, method: str, args: list): + return self.__proxy_peer.__apply__(self.__proxy_id, self.__proxy_oneway_methods, method, args) + + +class RpcPeer: + idCounter = 1 + peerName = 'Unnamed Peer' + params: Mapping[str, any] = {} + localProxied: Mapping[any, str] = {} + localProxyMap: Mapping[str, any] = {} + constructorSerializerMap = {} + proxyCounter = 1 + pendingResults: Mapping[str, Future] = {} + remoteWeakProxies: Mapping[str, any] = {} + nameDeserializerMap: Mapping[str, RpcSerializer] + + def __init__(self, send: Callable[[object, Callable[[Exception], None]], None]) -> None: + self.send = send + + def __apply__(self, proxyId: str, oneWayMethods: list[str], method: str, argArray: list): + args = [] + for arg in argArray: + args.append(self.serialize(arg, False)) + + rpcApply = { + 'type': 'apply', + 'id': None, + 'proxyId': proxyId, + 'argArray': args, + 'method': method, + } + + if oneWayMethods and method in oneWayMethods: + rpcApply['oneway'] = True + self.send(rpcApply, None) + future = Future() + future.set_result(None) + return future + + async def send(id: str, reject: Callable[[Exception], None]): + rpcApply['id'] = id + await self.send(rpcApply, reject) + return self.createPendingResult(send) + + def kill(self): + self.killed = True + + def createErrorResult(self, result: any, name: str, message: str, tb: str): + pass + + def serialize(self, value, requireProxy): + if (not value or (not requireProxy and type(value) in jsonSerializable)): + return value + __remote_constructor_name = 'Function' if callable(value) else value.__proxy_constructor if hasattr( + value, '__proxy_constructor') else type(value).__name__ + proxyId = self.localProxied.get(value, None) + if proxyId: + ret = { + '__remote_proxy_id': proxyId, + '__remote_constructor_name': __remote_constructor_name, + '__remote_proxy_props': getattr(value, '__proxy_props', None), + '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), + } + return ret + + __proxy_id = getattr(value, '__proxy_id', None) + __proxy_peer = getattr(value, '__proxy_peer', None) + if __proxy_id and __proxy_peer == self: + ret = { + '__local_proxy_id': __proxy_id, + } + return ret + + serializerMapName = self.constructorSerializerMap.get( + type(value).__name__) + if serializerMapName: + __remote_constructor_name = serializerMapName + serializer = self.nameDeserializerMap.get(serializerMapName, None) + serialized = serializer.serialize(value) + if not serialized or (not requireProxy and type(serialized).__name in jsonSerializable): + ret = { + '__remote_proxy_id': None, + '__remote_constructor_name': __remote_constructor_name, + '__remote_proxy_props': getattr(value, '__proxy_props', None), + '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), + '__serialized_value': value, + } + return ret + + proxyId = str(self.proxyCounter) + self.proxyCounter = self.proxyCounter + 1 + self.localProxied[value] = proxyId + self.localProxyMap[proxyId] = value + + ret = { + '__remote_proxy_id': proxyId, + '__remote_constructor_name': __remote_constructor_name, + '__remote_proxy_props': getattr(value, '__proxy_props', None), + '__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None), + } + + return ret + + def finalize(self, id: str): + pass + + def newProxy(self, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: list[str]): + proxy = RpcProxy(self, proxyId, proxyConstructorName, + proxyProps, proxyOneWayMethods) + wr = weakref.ref(proxy) + self.remoteWeakProxies[proxyId] = wr + weakref.finalize(proxy, lambda: self.finalize(proxyId)) + return proxy + + def deserialize(self, value): + if not value: + return value + + if type(value) != dict: + return value + + __remote_proxy_id = value.get('__remote_proxy_id', None) + __local_proxy_id = value.get('__local_proxy_id', None) + __remote_constructor_name = value.get( + '__remote_constructor_name', None) + __serialized_value = value.get('__serialized_value', None) + __remote_proxy_props = value.get('__remote_proxy_props', None) + __remote_proxy_oneway_methods = value.get( + '__remote_proxy_oneway_methods', None) + + if __remote_proxy_id: + weakref = self.remoteWeakProxies.get('__remote_proxy_id', None) + proxy = weakref() if weakref else None + if not proxy: + proxy = self.newProxy(__remote_proxy_id, __remote_constructor_name, + __remote_proxy_props, __remote_proxy_oneway_methods) + return proxy + + if __local_proxy_id: + ret = self.localProxyMap.get(__local_proxy_id, None) + if not ret: + raise RpcResultException( + None, 'invalid local proxy id %s' % __local_proxy_id) + return ret + + deserializer = self.nameDeserializerMap.get( + __remote_constructor_name, None) + if deserializer: + return deserializer.deserialize(__serialized_value) + + return value + + async def handleMessage(self, message: any): + try: + type = message['type'] + match type: + case 'param': + result = { + 'type': 'result', + 'id': message['id'], + } + + try: + value = self.params.get(message['param'], None) + value = await maybe_await(value) + result['result'] = self.serialize( + value, message.get('requireProxy', None)) + except Exception as e: + tb = traceback.format_exc() + self.createErrorResult( + result, type(e).__name, str(e), tb) + + await self.send(result, None) + + case 'apply': + result = { + 'type': 'result', + 'id': message['id'], + } + method = message.get('method', None) + + try: + target = self.localProxyMap.get( + message['proxyId'], None) + if not target: + raise Exception('proxy id %s not found' % + message['proxyId']) + + args = [] + for arg in (message['argArray'] or []): + args.append(self.deserialize(arg)) + + value = None + if method: + if not hasattr(target, method): + raise Exception( + 'target %s does not have method %s' % (type(target), method)) + value = await maybe_await(target[method](*args)) + else: + value = await maybe_await(target(*args)) + + result['result'] = self.serialize(value, False) + except Exception as e: + print('failure', method, e) + self.createErrorResult(result, e) + + if message.get('oneway', False): + self.send(result) + + case 'result': + future = self.pendingResults.get(message['id'], None) + if not future: + raise RpcResultException( + None, 'unknown result %s' % message['id']) + del message['id'] + if hasattr(message, 'message') or hasattr(message, 'stack'): + e = RpcResultException( + None, message.get('message', None)) + e.stack = message.get('stack', None) + e.name = message.get('name', None) + future.set_exception(e) + return + future.set_result(self.deserialize( + message.get('result', None))) + case 'finalize': + local = self.localProxyMap.pop( + message['__local_proxy_id'], None) + self.localProxied.pop(local, None) + case _: + raise RpcResultException( + None, 'unknown rpc message type %s' % type) + except Exception as e: + print("unhandled rpc error", self.peerName, e) + pass + + async def createPendingResult(self, cb: Callable[[str, Callable[[Exception], None]], None]): + # if (Object.isFrozen(this.pendingResults)) + # return Promise.reject(new RPCResultError('RpcPeer has been killed')); + + id = str(self.idCounter) + self.idCounter = self.idCounter + 1 + future = Future() + self.pendingResults[id] = future + await cb(id, lambda e: future.set_exception(RpcResultException(e, None))) + return await future + + async def getParam(self, param): + async def send(id: str, reject: Callable[[Exception], None]): + paramMessage = { + 'id': id, + 'type': 'param', + 'param': param, + } + await self.send(paramMessage, reject) + return await self.createPendingResult(send) + +# c = RpcPeer() + + +async def readLoop(loop, peer, reader): + async for line in reader: + try: + message = json.loads(line) + asyncio.run_coroutine_threadsafe(peer.handleMessage(message), loop) + except Exception as e: + print('read loop error', e) + pass + + +async def main(loop: AbstractEventLoop): + reader, writer = await asyncio.open_connection( + '127.0.0.1', 3033) + + async def send(message, reject): + jsonString = json.dumps(message) + writer.write(bytes(jsonString + '\n', 'utf8')) + try: + await writer.drain() + except Exception as e: + if reject: + reject(e) + + peer = RpcPeer(send) + peer.params['print'] = print + + async def consoleTest(): + console = await peer.getParam('console') + await console.log('test', 'poops', 'peddeps') + + await asyncio.gather(readLoop(loop, peer, reader), consoleTest()) + print('done') + + # print("line %s" % line) + + # async with aiofiles.open(0, mode='r') as f: + # async for line in f: + # print("line %s" % line) + # # pokemon = json.loads(contents) + # # print(pokemon['name']) + +loop = asyncio.get_event_loop() +loop.run_until_complete(main(loop)) +loop.close() diff --git a/rpc/src/test.ts b/rpc/src/test.ts new file mode 100644 index 000000000..171829990 --- /dev/null +++ b/rpc/src/test.ts @@ -0,0 +1,40 @@ +import child_process from 'child_process'; +import net from 'net'; +import { RpcPeer } from '../../server/src/rpc'; +import readline from 'readline'; + +const cp = child_process.spawn('ls', ['-l', '/dev/fd'], { + stdio: ['pipe', 'pipe', 'pipe', 'pipe', 'pipe'], +}); + +cp.stdout.on('data', data => console.log(data.toString())); + + + +const server = net.createServer(async (connection) => { + let ended = false; + connection.on('end', () => ended = true); + connection.on('error', () => ended = true); + connection.on('close', () => ended = true); + const peer = new RpcPeer((message, reject) => { + if (ended) { + return reject?.(new Error('connection ended')); + } + connection.write(JSON.stringify(message) + '\n', e => e && reject?.(e)); + }); + (console as any).__proxy_required = true; + peer.params.console = console; + + const readInterface = readline.createInterface({ + input: connection, + terminal: false, + }); + readInterface.on('line', line => { + peer.handleMessage(JSON.parse(line)); + }); + + const print = await peer.getParam('print'); + await print('hello!!'); +}); + +server.listen(3033); diff --git a/rpc/tsconfig.json b/rpc/tsconfig.json new file mode 100644 index 000000000..cd66ca1ca --- /dev/null +++ b/rpc/tsconfig.json @@ -0,0 +1,16 @@ +{ + "compilerOptions": { + "module": "commonjs", + "target": "esnext", + "noImplicitAny": true, + "outDir": "./dist", + "esModuleInterop": true, + // skip error: Interface 'WebGL2RenderingContext' incorrectly extends interface 'WebGL2RenderingContextBase'. + // https://github.com/tensorflow/tfjs/issues/4201 + "skipLibCheck": true, + "sourceMap": true + }, + "include": [ + "src/**/*" + ], +} \ No newline at end of file diff --git a/server/src/plugin/plugin-remote.ts b/server/src/plugin/plugin-remote.ts index 9204f2a06..975d1a1e8 100644 --- a/server/src/plugin/plugin-remote.ts +++ b/server/src/plugin/plugin-remote.ts @@ -272,13 +272,8 @@ interface WebSocketCallbacks { export async function setupPluginRemote(peer: RpcPeer, api: PluginAPI, pluginId: string): Promise { peer.addSerializer(Buffer, 'Buffer', new BufferSerializer()); - - const ret = await peer.eval('return getRemote(api, pluginId)', undefined, { - api, - pluginId, - }, true) as PluginRemote; - - return ret; + const getRemote = await peer.getParam('getRemote'); + return getRemote(api, pluginId); } export interface PluginRemoteAttachOptions { @@ -326,7 +321,8 @@ export function attachPluginRemote(peer: RpcPeer, options?: PluginRemoteAttachOp const localStorage = new StorageImpl(deviceManager, undefined); - const remote: PluginRemote = { + const remote: PluginRemote & { __proxy_required: boolean } = { + __proxy_required: true, __proxy_oneway_methods: [ 'notify', 'updateDescriptor', diff --git a/server/src/rpc.ts b/server/src/rpc.ts index b9a9ec811..658224579 100644 --- a/server/src/rpc.ts +++ b/server/src/rpc.ts @@ -12,12 +12,9 @@ export interface RpcMessage { type: string; } -interface RpcEval extends RpcMessage { +interface RpcParam extends RpcMessage { id: string; - script: string; - filename: string; - params: { [name: string]: any }; - requireProxy: boolean; + param: string; } interface RpcApply extends RpcMessage { @@ -127,13 +124,13 @@ class RpcProxy implements ProxyHandler { if (this.proxyOneWayMethods?.includes?.(method)) { rpcApply.oneway = true; - this.peer.send(rpcApply, e => new RPCResultError(e.message, e)); + this.peer.send(rpcApply); return Promise.resolve(); } - return this.peer.createPendingResult(id => { + return this.peer.createPendingResult((id, reject) => { rpcApply.id = id; - this.peer.send(rpcApply, e => new RPCResultError(e.message, e)); + this.peer.send(rpcApply, reject); }) } } @@ -186,7 +183,6 @@ export class RpcPeer { localProxied = new Map(); localProxyMap: { [id: string]: any } = {}; remoteWeakProxies: { [id: string]: WeakRef } = {}; - remoteProxyWrapper: { [constructorName: string]: (proxy: any) => any } = {}; finalizers = new FinalizationRegistry(id => this.finalize(id as string)); nameDeserializerMap = new Map(); constructorSerializerMap = new Map(); @@ -194,7 +190,7 @@ export class RpcPeer { constructor(public send: (message: RpcMessage, reject?: (e: Error) => void) => void) { } - createPendingResult(cb: (id: string) => void): Promise { + createPendingResult(cb: (id: string, reject: (e: Error) => void) => void): Promise { if (Object.isFrozen(this.pendingResults)) return Promise.reject(new RPCResultError('RpcPeer has been killed')); @@ -202,7 +198,7 @@ export class RpcPeer { const id = (this.idCounter++).toString(); this.pendingResults[id] = { resolve, reject }; - cb(id); + cb(id, e => reject(new RPCResultError(e.message, e))); }); // todo: make this an option so rpc doesn't nuke the process if uncaught? @@ -237,23 +233,15 @@ export class RpcPeer { this.send(rpcFinalize); } - eval(script: string, filename?: string, params?: { [name: string]: any }, requireProxy?: boolean): Promise { - return this.createPendingResult(id => { - const coercedParams: { [name: string]: any } = {}; - for (const key of Object.keys(params || {})) { - coercedParams[key] = this.serialize(params[key]); - } - - const evalMessage: RpcEval = { - type: 'eval', + async getParam(param: string) { + return this.createPendingResult((id, reject) => { + const paramMessage: RpcParam = { id, - script, - filename, - params: coercedParams, - requireProxy, + type: 'param', + param, }; - this.send(evalMessage, e => new RPCResultError(e.message, e)); + this.send(paramMessage, reject); }); } @@ -291,7 +279,7 @@ export class RpcPeer { if (__local_proxy_id) { const ret = this.localProxyMap[__local_proxy_id]; if (!ret) - throw new Error(`invalid local proxy id ${__local_proxy_id}`); + throw new RPCResultError(`invalid local proxy id ${__local_proxy_id}`); return ret; } @@ -302,8 +290,8 @@ export class RpcPeer { return value; } - serialize(value: any, requireProxy?: boolean): any { - if (!value || (!requireProxy && jsonSerializable.has(value.constructor?.name))) { + serialize(value: any): any { + if (!value || (!value.__proxy_required && jsonSerializable.has(value.constructor?.name))) { return value; } @@ -333,7 +321,7 @@ export class RpcPeer { __remote_constructor_name = serializerMapName; const serializer = this.nameDeserializerMap.get(serializerMapName); const serialized = serializer.serialize(value); - if (!serialized || (!requireProxy && jsonSerializable.has(serialized.constructor?.name))) { + if (!serialized || (!value.__proxy_required && jsonSerializable.has(serialized.constructor?.name))) { const ret: RpcRemoteProxyValue = { __remote_proxy_id: undefined, __remote_constructor_name, @@ -361,9 +349,8 @@ export class RpcPeer { newProxy(proxyId: string, proxyConstructorName: string, proxyProps: any, proxyOneWayMethods: string[]) { const rpc = new RpcProxy(this, proxyId, proxyConstructorName, proxyProps, proxyOneWayMethods); - const wrapped = this.remoteProxyWrapper[proxyConstructorName]?.(rpc) || rpc; - const target = proxyConstructorName === 'Function' || proxyConstructorName === 'AsyncFunction' ? function () { } : wrapped; - const proxy = new Proxy(target, wrapped); + const target = proxyConstructorName === 'Function' || proxyConstructorName === 'AsyncFunction' ? function () { } : rpc; + const proxy = new Proxy(target, rpc); const weakref = new WeakRef(proxy); this.remoteWeakProxies[proxyId] = weakref; this.finalizers.register(rpc, proxyId); @@ -374,26 +361,13 @@ export class RpcPeer { async handleMessage(message: RpcMessage) { try { switch (message.type) { - case 'eval': { - const rpcEval = message as RpcEval; + case 'param': { + const rpcParam = message as RpcParam; const result: RpcResult = { type: 'result', - id: rpcEval.id, + id: rpcParam.id, + result: this.serialize(this.params[rpcParam.param]) }; - try { - const coercedParams: { [name: string]: any } = {}; - for (const key of Object.keys(rpcEval.params || {})) { - coercedParams[key] = this.deserialize(rpcEval.params[key]); - } - const params = Object.assign({}, this.params, coercedParams); - const value = await this.evalLocal(rpcEval.script, rpcEval.filename, params); - - result.result = this.serialize(value, rpcEval.requireProxy); - } - catch (e) { - this.createErrorResult(result, e); - } - this.send(result); break; } @@ -414,7 +388,6 @@ export class RpcPeer { args.push(this.deserialize(arg)); } - // const value = rpcApply.method ? await target[rpcApply.method](...args) : await target(...args); let value: any; if (rpcApply.method) { const method = target[rpcApply.method];