Files
scrypted/server/python/rpc.py
2021-11-04 20:29:02 -07:00

387 lines
13 KiB
Python

from asyncio.events import AbstractEventLoop
from asyncio.futures import Future
from typing import Callable
import asyncio
import json
import traceback
import inspect
import json
from collections.abc import Mapping, Sequence
import weakref
jsonSerializable = set()
jsonSerializable.add(float)
jsonSerializable.add(int)
jsonSerializable.add(str)
jsonSerializable.add(dict)
jsonSerializable.add(bool)
jsonSerializable.add(list)
async def maybe_await(value):
if (inspect.iscoroutinefunction(value) or inspect.iscoroutine(value)):
return await value
return value
class RpcResultException(Exception):
name = None
stack = None
def __init__(self, caught, message):
self.caught = caught
self.message = message
class RpcSerializer:
def serialize(self, value):
pass
def deserialize(self, value):
pass
class RpcProxyMethod:
def __init__(self, proxy, name):
self.__proxy = proxy
self.__proxy_method_name = name
def __call__(self, *args, **kwargs):
return self.__proxy.__apply__(self.__proxy_method_name, args)
class RpcProxy:
def __init__(self, peer, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: list[str]):
self.__proxy_id = proxyId
self.__proxy_constructor = proxyConstructorName
self.__proxy_peer = peer
self.__proxy_props = proxyProps
self.__proxy_oneway_methods = proxyOneWayMethods
def __getattr__(self, name):
if self.__proxy_props and hasattr(self.__proxy_props, name):
return self.__proxy_props[name]
return RpcProxyMethod(self, name)
def __call__(self, *args, **kwargs):
print('call')
pass
def __apply__(self, method: str, args: list):
return self.__proxy_peer.__apply__(self.__proxy_id, self.__proxy_oneway_methods, method, args)
class RpcPeer:
idCounter = 1
peerName = 'Unnamed Peer'
params: Mapping[str, any] = {}
localProxied: Mapping[any, str] = {}
localProxyMap: Mapping[str, any] = {}
constructorSerializerMap = {}
proxyCounter = 1
pendingResults: Mapping[str, Future] = {}
remoteWeakProxies: Mapping[str, any] = {}
nameDeserializerMap: Mapping[str, RpcSerializer] = {}
def __init__(self, send: Callable[[object, Callable[[Exception], None]], None]) -> None:
self.send = send
def __apply__(self, proxyId: str, oneWayMethods: list[str], method: str, argArray: list):
args = []
for arg in argArray:
args.append(self.serialize(arg, False))
rpcApply = {
'type': 'apply',
'id': None,
'proxyId': proxyId,
'argArray': args,
'method': method,
}
if not oneWayMethods or method not in oneWayMethods:
rpcApply['oneway'] = True
self.send(rpcApply)
future = Future()
future.set_result(None)
return future
async def send(id: str, reject: Callable[[Exception], None]):
rpcApply['id'] = id
self.send(rpcApply, reject)
return self.createPendingResult(send)
def kill(self):
self.killed = True
def createErrorResult(self, result: any, name: str, message: str, tb: str):
result['stack'] = tb if tb else 'no stack'
result['result'] = name if name else 'no name'
result['message'] = message if message else 'no message'
def serialize(self, value, requireProxy):
if (not value or (not requireProxy and type(value) in jsonSerializable)):
return value
__remote_constructor_name = 'Function' if callable(value) else value.__proxy_constructor if hasattr(
value, '__proxy_constructor') else type(value).__name__
proxyId = self.localProxied.get(value, None)
if proxyId:
ret = {
'__remote_proxy_id': proxyId,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': getattr(value, '__proxy_props', None),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
}
return ret
__proxy_id = getattr(value, '__proxy_id', None)
__proxy_peer = getattr(value, '__proxy_peer', None)
if __proxy_id and __proxy_peer == self:
ret = {
'__local_proxy_id': __proxy_id,
}
return ret
serializerMapName = self.constructorSerializerMap.get(
type(value).__name__)
if serializerMapName:
__remote_constructor_name = serializerMapName
serializer = self.nameDeserializerMap.get(serializerMapName, None)
serialized = serializer.serialize(value)
ret = {
'__remote_proxy_id': None,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': getattr(value, '__proxy_props', None),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
'__serialized_value': serialized,
}
return ret
proxyId = str(self.proxyCounter)
self.proxyCounter = self.proxyCounter + 1
self.localProxied[value] = proxyId
self.localProxyMap[proxyId] = value
ret = {
'__remote_proxy_id': proxyId,
'__remote_constructor_name': __remote_constructor_name,
'__remote_proxy_props': getattr(value, '__proxy_props', None),
'__remote_proxy_oneway_methods': getattr(value, '__proxy_oneway_methods', None),
}
return ret
def finalize(self, id: str):
self.remoteWeakProxies.pop(id, None)
rpcFinalize = {
'__local_proxy_id': id,
'type': 'finalize',
}
self.send(rpcFinalize)
def newProxy(self, proxyId: str, proxyConstructorName: str, proxyProps: any, proxyOneWayMethods: list[str]):
proxy = RpcProxy(self, proxyId, proxyConstructorName,
proxyProps, proxyOneWayMethods)
wr = weakref.ref(proxy)
self.remoteWeakProxies[proxyId] = wr
weakref.finalize(proxy, lambda: self.finalize(proxyId))
return proxy
def deserialize(self, value):
if not value:
return value
if type(value) != dict:
return value
__remote_proxy_id = value.get('__remote_proxy_id', None)
__local_proxy_id = value.get('__local_proxy_id', None)
__remote_constructor_name = value.get(
'__remote_constructor_name', None)
__serialized_value = value.get('__serialized_value', None)
__remote_proxy_props = value.get('__remote_proxy_props', None)
__remote_proxy_oneway_methods = value.get(
'__remote_proxy_oneway_methods', None)
if __remote_proxy_id:
weakref = self.remoteWeakProxies.get('__remote_proxy_id', None)
proxy = weakref() if weakref else None
if not proxy:
proxy = self.newProxy(__remote_proxy_id, __remote_constructor_name,
__remote_proxy_props, __remote_proxy_oneway_methods)
return proxy
if __local_proxy_id:
ret = self.localProxyMap.get(__local_proxy_id, None)
if not ret:
raise RpcResultException(
None, 'invalid local proxy id %s' % __local_proxy_id)
return ret
deserializer = self.nameDeserializerMap.get(
__remote_constructor_name, None)
if deserializer:
return deserializer.deserialize(__serialized_value)
return value
async def handleMessage(self, message: any):
try:
type = message['type']
if type == 'param':
result = {
'type': 'result',
'id': message['id'],
}
try:
value = self.params.get(message['param'], None)
value = await maybe_await(value)
result['result'] = self.serialize(
value, message.get('requireProxy', None))
except Exception as e:
tb = traceback.format_exc()
self.createErrorResult(
result, type(e).__name, str(e), tb)
self.send(result)
elif type == 'apply':
result = {
'type': 'result',
'id': message['id'],
}
method = message.get('method', None)
try:
target = self.localProxyMap.get(
message['proxyId'], None)
if not target:
raise Exception('proxy id %s not found' %
message['proxyId'])
args = []
for arg in (message['argArray'] or []):
args.append(self.deserialize(arg))
value = None
if method:
if not hasattr(target, method):
raise Exception(
'target %s does not have method %s' % (type(target), method))
invoke = getattr(target, method)
value = await maybe_await(invoke(*args))
else:
value = await maybe_await(target(*args))
result['result'] = self.serialize(value, False)
except Exception as e:
print('failure', method, e)
tb = traceback.format_exc()
self.createErrorResult(
result, type(e).__name, str(e), tb)
if not message.get('oneway', False):
self.send(result)
elif type == 'result':
future = self.pendingResults.get(message['id'], None)
if not future:
raise RpcResultException(
None, 'unknown result %s' % message['id'])
del message['id']
if hasattr(message, 'message') or hasattr(message, 'stack'):
e = RpcResultException(
None, message.get('message', None))
e.stack = message.get('stack', None)
e.name = message.get('name', None)
future.set_exception(e)
return
future.set_result(self.deserialize(
message.get('result', None)))
elif type == 'finalize':
local = self.localProxyMap.pop(
message['__local_proxy_id'], None)
self.localProxied.pop(local, None)
else:
raise RpcResultException(
None, 'unknown rpc message type %s' % type)
except Exception as e:
print("unhandled rpc error", self.peerName, e)
pass
async def createPendingResult(self, cb: Callable[[str, Callable[[Exception], None]], None]):
# if (Object.isFrozen(this.pendingResults))
# return Promise.reject(new RPCResultError('RpcPeer has been killed'));
id = str(self.idCounter)
self.idCounter = self.idCounter + 1
future = Future()
self.pendingResults[id] = future
await cb(id, lambda e: future.set_exception(RpcResultException(e, None)))
return await future
async def getParam(self, param):
async def send(id: str, reject: Callable[[Exception], None]):
paramMessage = {
'id': id,
'type': 'param',
'param': param,
}
self.send(paramMessage, reject)
return await self.createPendingResult(send)
# c = RpcPeer()
async def readLoop(loop, peer, reader):
async for line in reader:
try:
message = json.loads(line)
asyncio.run_coroutine_threadsafe(peer.handleMessage(message), loop)
except Exception as e:
print('read loop error', e)
pass
async def async_main(loop: AbstractEventLoop):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 3033)
async def send(message, reject):
jsonString = json.dumps(message)
writer.write(bytes(jsonString + '\n', 'utf8'))
try:
await writer.drain()
except Exception as e:
if reject:
reject(e)
peer = RpcPeer(send)
peer.params['print'] = print
async def consoleTest():
console = await peer.getParam('console')
await console.log('test', 'poops', 'peddeps')
await asyncio.gather(readLoop(loop, peer, reader), consoleTest())
print('done')
# print("line %s" % line)
# async with aiofiles.open(0, mode='r') as f:
# async for line in f:
# print("line %s" % line)
# # pokemon = json.loads(contents)
# # print(pokemon['name'])
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main(loop))
loop.close()
if __name__ == "__main__":
main()