server/rpc: simplify rpc code. initial python rpc.

This commit is contained in:
Koushik Dutta
2021-11-03 23:57:21 -07:00
parent a463823972
commit 7d01311f7f
9 changed files with 810 additions and 58 deletions

2
rpc/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
node_modules
.venv

38
rpc/.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,38 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/rpc.py",
"console": "integratedTerminal"
},
{
"type": "pwa-node",
"request": "launch",
"name": "Launch Program TS Node",
"skipFiles": [
"<node_internals>/**"
],
"program": "${workspaceFolder}/src/test.ts",
"runtimeArgs": [
"--expose-gc",
"--nolazy",
"-r",
"ts-node/register"
],
"sourceMaps": true,
"resolveSourceMapLocations": [
"${workspaceFolder}/**",
"!**/node_modules/**"
],
"outFiles": [
"${workspaceFolder}/**/*.js"
],
},
]
}

305
rpc/package-lock.json generated Normal file
View File

@@ -0,0 +1,305 @@
{
"name": "rpc",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"dependencies": {
"tslib": "^2.3.1"
},
"devDependencies": {
"ts-node": "^10.4.0",
"typescript": "^4.4.4"
}
},
"node_modules/@cspotcode/source-map-consumer": {
"version": "0.8.0",
"resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz",
"integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==",
"dev": true,
"engines": {
"node": ">= 12"
}
},
"node_modules/@cspotcode/source-map-support": {
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz",
"integrity": "sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==",
"dev": true,
"dependencies": {
"@cspotcode/source-map-consumer": "0.8.0"
},
"engines": {
"node": ">=12"
}
},
"node_modules/@tsconfig/node10": {
"version": "1.0.8",
"resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz",
"integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==",
"dev": true
},
"node_modules/@tsconfig/node12": {
"version": "1.0.9",
"resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz",
"integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==",
"dev": true
},
"node_modules/@tsconfig/node14": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz",
"integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==",
"dev": true
},
"node_modules/@tsconfig/node16": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz",
"integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==",
"dev": true
},
"node_modules/@types/node": {
"version": "16.11.6",
"resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.6.tgz",
"integrity": "sha512-ua7PgUoeQFjmWPcoo9khiPum3Pd60k4/2ZGXt18sm2Slk0W0xZTqt5Y0Ny1NyBiN1EVQ/+FaF9NcY4Qe6rwk5w==",
"dev": true,
"peer": true
},
"node_modules/acorn": {
"version": "8.5.0",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.5.0.tgz",
"integrity": "sha512-yXbYeFy+jUuYd3/CDcg2NkIYE991XYX/bje7LmjJigUciaeO1JR4XxXgCIV1/Zc/dRuFEyw1L0pbA+qynJkW5Q==",
"dev": true,
"bin": {
"acorn": "bin/acorn"
},
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/acorn-walk": {
"version": "8.2.0",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
"dev": true,
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/arg": {
"version": "4.1.3",
"resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz",
"integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==",
"dev": true
},
"node_modules/create-require": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz",
"integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
"dev": true
},
"node_modules/diff": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz",
"integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==",
"dev": true,
"engines": {
"node": ">=0.3.1"
}
},
"node_modules/make-error": {
"version": "1.3.6",
"resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz",
"integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==",
"dev": true
},
"node_modules/ts-node": {
"version": "10.4.0",
"resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.4.0.tgz",
"integrity": "sha512-g0FlPvvCXSIO1JDF6S232P5jPYqBkRL9qly81ZgAOSU7rwI0stphCgd2kLiCrU9DjQCrJMWEqcNSjQL02s6d8A==",
"dev": true,
"dependencies": {
"@cspotcode/source-map-support": "0.7.0",
"@tsconfig/node10": "^1.0.7",
"@tsconfig/node12": "^1.0.7",
"@tsconfig/node14": "^1.0.0",
"@tsconfig/node16": "^1.0.2",
"acorn": "^8.4.1",
"acorn-walk": "^8.1.1",
"arg": "^4.1.0",
"create-require": "^1.1.0",
"diff": "^4.0.1",
"make-error": "^1.1.1",
"yn": "3.1.1"
},
"bin": {
"ts-node": "dist/bin.js",
"ts-node-cwd": "dist/bin-cwd.js",
"ts-node-script": "dist/bin-script.js",
"ts-node-transpile-only": "dist/bin-transpile.js",
"ts-script": "dist/bin-script-deprecated.js"
},
"peerDependencies": {
"@swc/core": ">=1.2.50",
"@swc/wasm": ">=1.2.50",
"@types/node": "*",
"typescript": ">=2.7"
},
"peerDependenciesMeta": {
"@swc/core": {
"optional": true
},
"@swc/wasm": {
"optional": true
}
}
},
"node_modules/tslib": {
"version": "2.3.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.3.1.tgz",
"integrity": "sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw=="
},
"node_modules/typescript": {
"version": "4.4.4",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.4.tgz",
"integrity": "sha512-DqGhF5IKoBl8WNf8C1gu8q0xZSInh9j1kJJMqT3a94w1JzVaBU4EXOSMrz9yDqMT0xt3selp83fuFMQ0uzv6qA==",
"dev": true,
"bin": {
"tsc": "bin/tsc",
"tsserver": "bin/tsserver"
},
"engines": {
"node": ">=4.2.0"
}
},
"node_modules/yn": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz",
"integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==",
"dev": true,
"engines": {
"node": ">=6"
}
}
},
"dependencies": {
"@cspotcode/source-map-consumer": {
"version": "0.8.0",
"resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz",
"integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==",
"dev": true
},
"@cspotcode/source-map-support": {
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz",
"integrity": "sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==",
"dev": true,
"requires": {
"@cspotcode/source-map-consumer": "0.8.0"
}
},
"@tsconfig/node10": {
"version": "1.0.8",
"resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz",
"integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==",
"dev": true
},
"@tsconfig/node12": {
"version": "1.0.9",
"resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz",
"integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==",
"dev": true
},
"@tsconfig/node14": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz",
"integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==",
"dev": true
},
"@tsconfig/node16": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz",
"integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==",
"dev": true
},
"@types/node": {
"version": "16.11.6",
"resolved": "https://registry.npmjs.org/@types/node/-/node-16.11.6.tgz",
"integrity": "sha512-ua7PgUoeQFjmWPcoo9khiPum3Pd60k4/2ZGXt18sm2Slk0W0xZTqt5Y0Ny1NyBiN1EVQ/+FaF9NcY4Qe6rwk5w==",
"dev": true,
"peer": true
},
"acorn": {
"version": "8.5.0",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.5.0.tgz",
"integrity": "sha512-yXbYeFy+jUuYd3/CDcg2NkIYE991XYX/bje7LmjJigUciaeO1JR4XxXgCIV1/Zc/dRuFEyw1L0pbA+qynJkW5Q==",
"dev": true
},
"acorn-walk": {
"version": "8.2.0",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
"dev": true
},
"arg": {
"version": "4.1.3",
"resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz",
"integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==",
"dev": true
},
"create-require": {
"version": "1.1.1",
"resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz",
"integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==",
"dev": true
},
"diff": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz",
"integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==",
"dev": true
},
"make-error": {
"version": "1.3.6",
"resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz",
"integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==",
"dev": true
},
"ts-node": {
"version": "10.4.0",
"resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.4.0.tgz",
"integrity": "sha512-g0FlPvvCXSIO1JDF6S232P5jPYqBkRL9qly81ZgAOSU7rwI0stphCgd2kLiCrU9DjQCrJMWEqcNSjQL02s6d8A==",
"dev": true,
"requires": {
"@cspotcode/source-map-support": "0.7.0",
"@tsconfig/node10": "^1.0.7",
"@tsconfig/node12": "^1.0.7",
"@tsconfig/node14": "^1.0.0",
"@tsconfig/node16": "^1.0.2",
"acorn": "^8.4.1",
"acorn-walk": "^8.1.1",
"arg": "^4.1.0",
"create-require": "^1.1.0",
"diff": "^4.0.1",
"make-error": "^1.1.1",
"yn": "3.1.1"
}
},
"tslib": {
"version": "2.3.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.3.1.tgz",
"integrity": "sha512-77EbyPPpMz+FRFRuAFlWMtmgUWGe9UOG2Z25NqCwiIjRhOf5iKGuzSe5P2w1laq+FkRy4p+PCuVkJSGkzTEKVw=="
},
"typescript": {
"version": "4.4.4",
"resolved": "https://registry.npmjs.org/typescript/-/typescript-4.4.4.tgz",
"integrity": "sha512-DqGhF5IKoBl8WNf8C1gu8q0xZSInh9j1kJJMqT3a94w1JzVaBU4EXOSMrz9yDqMT0xt3selp83fuFMQ0uzv6qA==",
"dev": true
},
"yn": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz",
"integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==",
"dev": true
}
}
}

9
rpc/package.json Normal file
View File

@@ -0,0 +1,9 @@
{
"devDependencies": {
"ts-node": "^10.4.0",
"typescript": "^4.4.4"
},
"dependencies": {
"tslib": "^2.3.1"
}
}

373
rpc/rpc.py Normal file
View File

@@ -0,0 +1,373 @@
from asyncio.events import AbstractEventLoop
from asyncio.futures import Future
from typing import Callable
import aiofiles
import asyncio
import json
import traceback
import inspect
import json
from collections.abc import Mapping, Sequence
import weakref
jsonSerializable = set()
jsonSerializable.add(float)
jsonSerializable.add(int)
jsonSerializable.add(str)
jsonSerializable.add(dict)
jsonSerializable.add(bool)
jsonSerializable.add(list)
async def maybe_await(value):
if (inspect.iscoroutinefunction(value)):
return await value
return value
class RpcResultException(Exception):
name = None
stack = None
def __init__(self, caught, message):
self.caught = caught
self.message = message
class RpcSerializer:
def serialize(self, value):
pass
def deserialize(self, value):
pass
class RpcProxyMethod:
def __init__(self, proxy, name):
self.__proxy = proxy
self.__proxy_method_name = name
def __call__(self, *args, **kwargs):
return self.__proxy.__apply__(self.__proxy_method_name, args)
class RpcProxy:
def __init__(self, peer, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: list[str]):
self.__proxy_id = proxyId
self.__proxy_constructor = proxyConstructorName
self.__proxy_peer = peer
self.__proxy_props = proxyProps
self.__proxy_oneway_methods = proxyOneWayMethods
def __getattr__(self, name):
if self.__proxy_props and hasattr(self.__proxy_props, name):
return self.__proxy_props[name]
return RpcProxyMethod(self, name)
def __call__(self, *args, **kwargs):
print('call')
pass
def __apply__(self, method: str, args: list):
return self.__proxy_peer.__apply__(self.__proxy_id, self.__proxy_oneway_methods, method, args)
class RpcPeer:
idCounter = 1
peerName = 'Unnamed Peer'
params: Mapping[str, any] = {}
localProxied: Mapping[any, str] = {}
localProxyMap: Mapping[str, any] = {}
constructorSerializerMap = {}
proxyCounter = 1
pendingResults: Mapping[str, Future] = {}
remoteWeakProxies: Mapping[str, any] = {}
nameDeserializerMap: Mapping[str, RpcSerializer]
def __init__(self, send: Callable[[object, Callable[[Exception], None]], None]) -> None:
self.send = send
def __apply__(self, proxyId: str, oneWayMethods: list[str], method: str, argArray: list):
args = []
for arg in argArray:
args.append(self.serialize(arg, False))
rpcApply = {
'type': 'apply',
'id': None,
'proxyId': proxyId,
'argArray': args,
'method': method,
}
if oneWayMethods and method in oneWayMethods:
rpcApply['oneway'] = True
self.send(rpcApply, None)
future = Future()
future.set_result(None)
return future
async def send(id: str, reject: Callable[[Exception], None]):
rpcApply['id'] = id
await self.send(rpcApply, reject)
return self.createPendingResult(send)
def kill(self):
self.killed = True
def createErrorResult(self, result: any, name: str, message: str, tb: str):
pass
def serialize(self, value, requireProxy):
if (not value or (not requireProxy and type(value) in jsonSerializable)):
return value
__remote_constructor_name = 'Function' if callable(value) else value.__proxy_constructor if hasattr(
value, '__proxy_constructor') else type(value).__name__
proxyId = self.localProxied.get(value, None)
if proxyId:
ret = {
'__remote_proxy_id': proxyId,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': getattr(value, '__proxy_props', None),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
}
return ret
__proxy_id = getattr(value, '__proxy_id', None)
__proxy_peer = getattr(value, '__proxy_peer', None)
if __proxy_id and __proxy_peer == self:
ret = {
'__local_proxy_id': __proxy_id,
}
return ret
serializerMapName = self.constructorSerializerMap.get(
type(value).__name__)
if serializerMapName:
__remote_constructor_name = serializerMapName
serializer = self.nameDeserializerMap.get(serializerMapName, None)
serialized = serializer.serialize(value)
if not serialized or (not requireProxy and type(serialized).__name in jsonSerializable):
ret = {
'__remote_proxy_id': None,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': getattr(value, '__proxy_props', None),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
'__serialized_value': value,
}
return ret
proxyId = str(self.proxyCounter)
self.proxyCounter = self.proxyCounter + 1
self.localProxied[value] = proxyId
self.localProxyMap[proxyId] = value
ret = {
'__remote_proxy_id': proxyId,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': getattr(value, '__proxy_props', None),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
}
return ret
def finalize(self, id: str):
pass
def newProxy(self, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: list[str]):
proxy = RpcProxy(self, proxyId, proxyConstructorName,
proxyProps, proxyOneWayMethods)
wr = weakref.ref(proxy)
self.remoteWeakProxies[proxyId] = wr
weakref.finalize(proxy, lambda: self.finalize(proxyId))
return proxy
def deserialize(self, value):
if not value:
return value
if type(value) != dict:
return value
__remote_proxy_id = value.get('__remote_proxy_id', None)
__local_proxy_id = value.get('__local_proxy_id', None)
__remote_constructor_name = value.get(
'__remote_constructor_name', None)
__serialized_value = value.get('__serialized_value', None)
__remote_proxy_props = value.get('__remote_proxy_props', None)
__remote_proxy_oneway_methods = value.get(
'__remote_proxy_oneway_methods', None)
if __remote_proxy_id:
weakref = self.remoteWeakProxies.get('__remote_proxy_id', None)
proxy = weakref() if weakref else None
if not proxy:
proxy = self.newProxy(__remote_proxy_id, __remote_constructor_name,
__remote_proxy_props, __remote_proxy_oneway_methods)
return proxy
if __local_proxy_id:
ret = self.localProxyMap.get(__local_proxy_id, None)
if not ret:
raise RpcResultException(
None, 'invalid local proxy id %s' % __local_proxy_id)
return ret
deserializer = self.nameDeserializerMap.get(
__remote_constructor_name, None)
if deserializer:
return deserializer.deserialize(__serialized_value)
return value
async def handleMessage(self, message: any):
try:
type = message['type']
match type:
case 'param':
result = {
'type': 'result',
'id': message['id'],
}
try:
value = self.params.get(message['param'], None)
value = await maybe_await(value)
result['result'] = self.serialize(
value, message.get('requireProxy', None))
except Exception as e:
tb = traceback.format_exc()
self.createErrorResult(
result, type(e).__name, str(e), tb)
await self.send(result, None)
case 'apply':
result = {
'type': 'result',
'id': message['id'],
}
method = message.get('method', None)
try:
target = self.localProxyMap.get(
message['proxyId'], None)
if not target:
raise Exception('proxy id %s not found' %
message['proxyId'])
args = []
for arg in (message['argArray'] or []):
args.append(self.deserialize(arg))
value = None
if method:
if not hasattr(target, method):
raise Exception(
'target %s does not have method %s' % (type(target), method))
value = await maybe_await(target[method](*args))
else:
value = await maybe_await(target(*args))
result['result'] = self.serialize(value, False)
except Exception as e:
print('failure', method, e)
self.createErrorResult(result, e)
if message.get('oneway', False):
self.send(result)
case 'result':
future = self.pendingResults.get(message['id'], None)
if not future:
raise RpcResultException(
None, 'unknown result %s' % message['id'])
del message['id']
if hasattr(message, 'message') or hasattr(message, 'stack'):
e = RpcResultException(
None, message.get('message', None))
e.stack = message.get('stack', None)
e.name = message.get('name', None)
future.set_exception(e)
return
future.set_result(self.deserialize(
message.get('result', None)))
case 'finalize':
local = self.localProxyMap.pop(
message['__local_proxy_id'], None)
self.localProxied.pop(local, None)
case _:
raise RpcResultException(
None, 'unknown rpc message type %s' % type)
except Exception as e:
print("unhandled rpc error", self.peerName, e)
pass
async def createPendingResult(self, cb: Callable[[str, Callable[[Exception], None]], None]):
# if (Object.isFrozen(this.pendingResults))
# return Promise.reject(new RPCResultError('RpcPeer has been killed'));
id = str(self.idCounter)
self.idCounter = self.idCounter + 1
future = Future()
self.pendingResults[id] = future
await cb(id, lambda e: future.set_exception(RpcResultException(e, None)))
return await future
async def getParam(self, param):
async def send(id: str, reject: Callable[[Exception], None]):
paramMessage = {
'id': id,
'type': 'param',
'param': param,
}
await self.send(paramMessage, reject)
return await self.createPendingResult(send)
# c = RpcPeer()
async def readLoop(loop, peer, reader):
async for line in reader:
try:
message = json.loads(line)
asyncio.run_coroutine_threadsafe(peer.handleMessage(message), loop)
except Exception as e:
print('read loop error', e)
pass
async def main(loop: AbstractEventLoop):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 3033)
async def send(message, reject):
jsonString = json.dumps(message)
writer.write(bytes(jsonString + '\n', 'utf8'))
try:
await writer.drain()
except Exception as e:
if reject:
reject(e)
peer = RpcPeer(send)
peer.params['print'] = print
async def consoleTest():
console = await peer.getParam('console')
await console.log('test', 'poops', 'peddeps')
await asyncio.gather(readLoop(loop, peer, reader), consoleTest())
print('done')
# print("line %s" % line)
# async with aiofiles.open(0, mode='r') as f:
# async for line in f:
# print("line %s" % line)
# # pokemon = json.loads(contents)
# # print(pokemon['name'])
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

40
rpc/src/test.ts Normal file
View File

@@ -0,0 +1,40 @@
import child_process from 'child_process';
import net from 'net';
import { RpcPeer } from '../../server/src/rpc';
import readline from 'readline';
const cp = child_process.spawn('ls', ['-l', '/dev/fd'], {
stdio: ['pipe', 'pipe', 'pipe', 'pipe', 'pipe'],
});
cp.stdout.on('data', data => console.log(data.toString()));
const server = net.createServer(async (connection) => {
let ended = false;
connection.on('end', () => ended = true);
connection.on('error', () => ended = true);
connection.on('close', () => ended = true);
const peer = new RpcPeer((message, reject) => {
if (ended) {
return reject?.(new Error('connection ended'));
}
connection.write(JSON.stringify(message) + '\n', e => e && reject?.(e));
});
(console as any).__proxy_required = true;
peer.params.console = console;
const readInterface = readline.createInterface({
input: connection,
terminal: false,
});
readInterface.on('line', line => {
peer.handleMessage(JSON.parse(line));
});
const print = await peer.getParam('print');
await print('hello!!');
});
server.listen(3033);

16
rpc/tsconfig.json Normal file
View File

@@ -0,0 +1,16 @@
{
"compilerOptions": {
"module": "commonjs",
"target": "esnext",
"noImplicitAny": true,
"outDir": "./dist",
"esModuleInterop": true,
// skip error: Interface 'WebGL2RenderingContext' incorrectly extends interface 'WebGL2RenderingContextBase'.
// https://github.com/tensorflow/tfjs/issues/4201
"skipLibCheck": true,
"sourceMap": true
},
"include": [
"src/**/*"
],
}

View File

@@ -272,13 +272,8 @@ interface WebSocketCallbacks {
export async function setupPluginRemote(peer: RpcPeer, api: PluginAPI, pluginId: string): Promise<PluginRemote> {
peer.addSerializer(Buffer, 'Buffer', new BufferSerializer());
const ret = await peer.eval('return getRemote(api, pluginId)', undefined, {
api,
pluginId,
}, true) as PluginRemote;
return ret;
const getRemote = await peer.getParam('getRemote');
return getRemote(api, pluginId);
}
export interface PluginRemoteAttachOptions {
@@ -326,7 +321,8 @@ export function attachPluginRemote(peer: RpcPeer, options?: PluginRemoteAttachOp
const localStorage = new StorageImpl(deviceManager, undefined);
const remote: PluginRemote = {
const remote: PluginRemote & { __proxy_required: boolean } = {
__proxy_required: true,
__proxy_oneway_methods: [
'notify',
'updateDescriptor',

View File

@@ -12,12 +12,9 @@ export interface RpcMessage {
type: string;
}
interface RpcEval extends RpcMessage {
interface RpcParam extends RpcMessage {
id: string;
script: string;
filename: string;
params: { [name: string]: any };
requireProxy: boolean;
param: string;
}
interface RpcApply extends RpcMessage {
@@ -127,13 +124,13 @@ class RpcProxy implements ProxyHandler<any> {
if (this.proxyOneWayMethods?.includes?.(method)) {
rpcApply.oneway = true;
this.peer.send(rpcApply, e => new RPCResultError(e.message, e));
this.peer.send(rpcApply);
return Promise.resolve();
}
return this.peer.createPendingResult(id => {
return this.peer.createPendingResult((id, reject) => {
rpcApply.id = id;
this.peer.send(rpcApply, e => new RPCResultError(e.message, e));
this.peer.send(rpcApply, reject);
})
}
}
@@ -186,7 +183,6 @@ export class RpcPeer {
localProxied = new Map<any, string>();
localProxyMap: { [id: string]: any } = {};
remoteWeakProxies: { [id: string]: WeakRef<any> } = {};
remoteProxyWrapper: { [constructorName: string]: (proxy: any) => any } = {};
finalizers = new FinalizationRegistry(id => this.finalize(id as string));
nameDeserializerMap = new Map<string, RpcSerializer>();
constructorSerializerMap = new Map<string, string>();
@@ -194,7 +190,7 @@ export class RpcPeer {
constructor(public send: (message: RpcMessage, reject?: (e: Error) => void) => void) {
}
createPendingResult(cb: (id: string) => void): Promise<any> {
createPendingResult(cb: (id: string, reject: (e: Error) => void) => void): Promise<any> {
if (Object.isFrozen(this.pendingResults))
return Promise.reject(new RPCResultError('RpcPeer has been killed'));
@@ -202,7 +198,7 @@ export class RpcPeer {
const id = (this.idCounter++).toString();
this.pendingResults[id] = { resolve, reject };
cb(id);
cb(id, e => reject(new RPCResultError(e.message, e)));
});
// todo: make this an option so rpc doesn't nuke the process if uncaught?
@@ -237,23 +233,15 @@ export class RpcPeer {
this.send(rpcFinalize);
}
eval(script: string, filename?: string, params?: { [name: string]: any }, requireProxy?: boolean): Promise<any> {
return this.createPendingResult(id => {
const coercedParams: { [name: string]: any } = {};
for (const key of Object.keys(params || {})) {
coercedParams[key] = this.serialize(params[key]);
}
const evalMessage: RpcEval = {
type: 'eval',
async getParam(param: string) {
return this.createPendingResult((id, reject) => {
const paramMessage: RpcParam = {
id,
script,
filename,
params: coercedParams,
requireProxy,
type: 'param',
param,
};
this.send(evalMessage, e => new RPCResultError(e.message, e));
this.send(paramMessage, reject);
});
}
@@ -291,7 +279,7 @@ export class RpcPeer {
if (__local_proxy_id) {
const ret = this.localProxyMap[__local_proxy_id];
if (!ret)
throw new Error(`invalid local proxy id ${__local_proxy_id}`);
throw new RPCResultError(`invalid local proxy id ${__local_proxy_id}`);
return ret;
}
@@ -302,8 +290,8 @@ export class RpcPeer {
return value;
}
serialize(value: any, requireProxy?: boolean): any {
if (!value || (!requireProxy && jsonSerializable.has(value.constructor?.name))) {
serialize(value: any): any {
if (!value || (!value.__proxy_required && jsonSerializable.has(value.constructor?.name))) {
return value;
}
@@ -333,7 +321,7 @@ export class RpcPeer {
__remote_constructor_name = serializerMapName;
const serializer = this.nameDeserializerMap.get(serializerMapName);
const serialized = serializer.serialize(value);
if (!serialized || (!requireProxy && jsonSerializable.has(serialized.constructor?.name))) {
if (!serialized || (!value.__proxy_required && jsonSerializable.has(serialized.constructor?.name))) {
const ret: RpcRemoteProxyValue = {
__remote_proxy_id: undefined,
__remote_constructor_name,
@@ -361,9 +349,8 @@ export class RpcPeer {
newProxy(proxyId: string, proxyConstructorName: string, proxyProps: any, proxyOneWayMethods: string[]) {
const rpc = new RpcProxy(this, proxyId, proxyConstructorName, proxyProps, proxyOneWayMethods);
const wrapped = this.remoteProxyWrapper[proxyConstructorName]?.(rpc) || rpc;
const target = proxyConstructorName === 'Function' || proxyConstructorName === 'AsyncFunction' ? function () { } : wrapped;
const proxy = new Proxy(target, wrapped);
const target = proxyConstructorName === 'Function' || proxyConstructorName === 'AsyncFunction' ? function () { } : rpc;
const proxy = new Proxy(target, rpc);
const weakref = new WeakRef(proxy);
this.remoteWeakProxies[proxyId] = weakref;
this.finalizers.register(rpc, proxyId);
@@ -374,26 +361,13 @@ export class RpcPeer {
async handleMessage(message: RpcMessage) {
try {
switch (message.type) {
case 'eval': {
const rpcEval = message as RpcEval;
case 'param': {
const rpcParam = message as RpcParam;
const result: RpcResult = {
type: 'result',
id: rpcEval.id,
id: rpcParam.id,
result: this.serialize(this.params[rpcParam.param])
};
try {
const coercedParams: { [name: string]: any } = {};
for (const key of Object.keys(rpcEval.params || {})) {
coercedParams[key] = this.deserialize(rpcEval.params[key]);
}
const params = Object.assign({}, this.params, coercedParams);
const value = await this.evalLocal(rpcEval.script, rpcEval.filename, params);
result.result = this.serialize(value, rpcEval.requireProxy);
}
catch (e) {
this.createErrorResult(result, e);
}
this.send(result);
break;
}
@@ -414,7 +388,6 @@ export class RpcPeer {
args.push(this.deserialize(arg));
}
// const value = rpcApply.method ? await target[rpcApply.method](...args) : await target(...args);
let value: any;
if (rpcApply.method) {
const method = target[rpcApply.method];